Merge pull request #54422 from kssenii/s3-queue-fixes

Fixes for storage `S3Queue`
This commit is contained in:
Kseniia Sumarokova 2023-10-18 21:51:51 +02:00 committed by GitHub
commit 4e0122a299
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 2590 additions and 1675 deletions

View File

@ -24,12 +24,15 @@ CREATE TABLE s3_queue_engine_table (name String, value UInt32)
[after_processing = 'keep',]
[keeper_path = '',]
[s3queue_loading_retries = 0,]
[s3queue_processing_threads_num = 1,]
[s3queue_enable_logging_to_s3queue_log = 0,]
[s3queue_polling_min_timeout_ms = 1000,]
[s3queue_polling_max_timeout_ms = 10000,]
[s3queue_polling_backoff_ms = 0,]
[s3queue_tracked_files_limit = 1000,]
[s3queue_tracked_file_ttl_sec = 0,]
[s3queue_polling_size = 50,]
[s3queue_tracked_files_limit = 1000,]
[s3queue_cleanup_interval_min_ms = 10000,]
[s3queue_cleanup_interval_max_ms = 30000,]
```
**Engine parameters**
@ -46,7 +49,7 @@ CREATE TABLE s3_queue_engine_table (name String, value UInt32)
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
mode = 'ordered';
mode = 'unordered';
```
Using named collections:
@ -109,6 +112,18 @@ Possible values:
Default value: `0`.
### s3queue_processing_threads_num {#processing_threads_num}
Number of threads to perform processing. Applies only for `Unordered` mode.
Default value: `1`.
### s3queue_enable_logging_to_s3queue_log {#enable_logging_to_s3queue_log}
Enable logging to `system.s3queue_log`.
Default value: `0`.
### s3queue_polling_min_timeout_ms {#polling_min_timeout_ms}
Minimal timeout before next polling (in milliseconds).
@ -161,18 +176,17 @@ Possible values:
Default value: `0`.
### s3queue_polling_size {#polling_size}
### s3queue_cleanup_interval_min_ms {#cleanup_interval_min_ms}
Maximum files to fetch from S3 with SELECT or in background task.
Engine takes files for processing from S3 in batches.
We limit the batch size to increase concurrency if multiple table engines with the same `keeper_path` consume files from the same path.
For 'Ordered' mode. Defines a minimum boundary for reschedule interval for a background task, which is responsible for maintaining tracked file TTL and maximum tracked files set.
Possible values:
Default value: `10000`.
- Positive integer.
### s3queue_cleanup_interval_max_ms {#cleanup_interval_max_ms}
Default value: `50`.
For 'Ordered' mode. Defines a maximum boundary for reschedule interval for a background task, which is responsible for maintaining tracked file TTL and maximum tracked files set.
Default value: `30000`.
## S3-related Settings {#s3-settings}
@ -227,6 +241,118 @@ For more information about virtual columns see [here](../../../engines/table-eng
Constructions with `{}` are similar to the [remote](../../../sql-reference/table-functions/remote.md) table function.
:::note
If the listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`.
:::
## Limitations {#limitations}
1. Duplicated rows can be as a result of:
- an exception happens during parsing in the middle of file processing and retries are enabled via `s3queue_loading_retries`;
- `S3Queue` is configured on multiple servers pointing to the same path in zookeeper and keeper session expires before one server managed to commit processed file, which could lead to another server taking processing of the file, which could be partially or fully processed by the first server;
- abnormal server termination.
2. `S3Queue` is configured on multiple servers pointing to the same path in zookeeper and `Ordered` mode is used, then `s3queue_loading_retries` will not work. This will be fixed soon.
## Introspection {#introspection}
For introspection use `system.s3queue` stateless table and `system.s3queue_log` persistent table.
1. `system.s3queue`. This table is not persistent and shows in-memory state of `S3Queue`: which files are currently being processed, which files are processed or failed.
``` sql
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE system.s3queue
(
`database` String,
`table` String,
`file_name` String,
`rows_processed` UInt64,
`status` String,
`processing_start_time` Nullable(DateTime),
`processing_end_time` Nullable(DateTime),
`ProfileEvents` Map(String, UInt64)
`exception` String
)
ENGINE = SystemS3Queue
COMMENT 'SYSTEM TABLE is built on the fly.' │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
Example:
``` sql
SELECT *
FROM system.s3queue
Row 1:
──────
zookeeper_path: /clickhouse/s3queue/25ea5621-ae8c-40c7-96d0-cec959c5ab88/3b3f66a1-9866-4c2e-ba78-b6bfa154207e
file_name: wikistat/original/pageviews-20150501-030000.gz
rows_processed: 5068534
status: Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time: 2023-10-13 13:10:31
ProfileEvents: {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5068534,'SelectedBytes':198132283,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':2480,'S3QueueSetFileProcessedMicroseconds':9985,'S3QueuePullMicroseconds':273776,'LogTest':17}
exception:
```
2. `system.s3queue_log`. Persistent table. Has the same information as `system.s3queue`, but for `processed` and `failed` files.
The table has the following structure:
``` sql
SHOW CREATE TABLE system.s3queue_log
Query id: 0ad619c3-0f2a-4ee4-8b40-c73d86e04314
┌─statement──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ CREATE TABLE system.s3queue_log
(
`event_date` Date,
`event_time` DateTime,
`table_uuid` String,
`file_name` String,
`rows_processed` UInt64,
`status` Enum8('Processed' = 0, 'Failed' = 1),
`processing_start_time` Nullable(DateTime),
`processing_end_time` Nullable(DateTime),
`ProfileEvents` Map(String, UInt64),
`exception` String
)
ENGINE = MergeTree
PARTITION BY toYYYYMM(event_date)
ORDER BY (event_date, event_time)
SETTINGS index_granularity = 8192 │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
In order to use `system.s3queue_log` define its configuration in server config file:
``` xml
<s3queue_log>
<database>system</database>
<table>s3queue_log</table>
</s3queue_log>
```
Example:
``` sql
SELECT *
FROM system.s3queue_log
Row 1:
──────
event_date: 2023-10-13
event_time: 2023-10-13 13:10:12
table_uuid:
file_name: wikistat/original/pageviews-20150501-020000.gz
rows_processed: 5112621
status: Processed
processing_start_time: 2023-10-13 13:09:48
processing_end_time: 2023-10-13 13:10:12
ProfileEvents: {'ZooKeeperTransactions':3,'ZooKeeperGet':2,'ZooKeeperMulti':1,'SelectedRows':5112621,'SelectedBytes':198577687,'ContextLock':1,'S3QueueSetFileProcessingMicroseconds':1934,'S3QueueSetFileProcessedMicroseconds':17063,'S3QueuePullMicroseconds':5841972,'LogTest':17}
exception:
```

View File

@ -391,7 +391,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZOK || code == Coordination::Error::ZNODEEXISTS)
return std::make_shared<zkutil::EphemeralNodeHolder>(current_worker_path, *zookeeper, false, false, description);
return zkutil::EphemeralNodeHolder::existing(current_worker_path, *zookeeper);
if (code == Coordination::Error::ZBADVERSION)
{

View File

@ -530,6 +530,13 @@ The server successfully detected this situation and will download merged part fr
M(OverflowThrow, "Number of times, data processing was cancelled by query complexity limitation with setting '*_overflow_mode' = 'throw' and exception was thrown.") \
M(OverflowAny, "Number of times approximate GROUP BY was in effect: when aggregation was performed only on top of first 'max_rows_to_group_by' unique keys and other keys were ignored due to 'group_by_overflow_mode' = 'any'.") \
\
M(S3QueueSetFileProcessingMicroseconds, "Time spent to set file as processing")\
M(S3QueueSetFileProcessedMicroseconds, "Time spent to set file as processed")\
M(S3QueueSetFileFailedMicroseconds, "Time spent to set file as failed")\
M(S3QueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed")\
M(S3QueuePullMicroseconds, "Time spent to read file data")\
M(S3QueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses")\
\
M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds")\
M(IOUringSQEsSubmitted, "Total number of io_uring SQEs submitted") \
M(IOUringSQEsResubmits, "Total number of io_uring SQE resubmits performed") \
@ -589,9 +596,14 @@ Timer::Timer(Counters & counters_, Event timer_event_, Event counter_event, Reso
counters.increment(counter_event);
}
UInt64 Timer::get()
{
return watch.elapsedNanoseconds() / static_cast<UInt64>(resolution);
}
void Timer::end()
{
counters.increment(timer_event, watch.elapsedNanoseconds() / static_cast<UInt64>(resolution));
counters.increment(timer_event, get());
watch.reset();
}

View File

@ -41,6 +41,7 @@ namespace ProfileEvents
~Timer() { end(); }
void cancel() { watch.reset(); }
void end();
UInt64 get();
private:
Counters & counters;

View File

@ -10,6 +10,7 @@
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ZooKeeperLog.h>

View File

@ -27,6 +27,7 @@
M(ZooKeeperLogElement) \
M(ProcessorProfileLogElement) \
M(TextLogElement) \
M(S3QueueLogElement) \
M(FilesystemCacheLogElement) \
M(FilesystemReadPrefetchesLogElement) \
M(AsynchronousInsertLogElement) \

View File

@ -644,11 +644,18 @@ class EphemeralNodeHolder
public:
using Ptr = std::shared_ptr<EphemeralNodeHolder>;
EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool sequential, const std::string & data)
EphemeralNodeHolder(const std::string & path_, ZooKeeper & zookeeper_, bool create, bool try_create, bool sequential, const std::string & data)
: path(path_), zookeeper(zookeeper_)
{
if (create)
{
path = zookeeper.create(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral);
need_remove = created = true;
}
else if (try_create)
{
need_remove = created = Coordination::Error::ZOK == zookeeper.tryCreate(path, data, sequential ? CreateMode::EphemeralSequential : CreateMode::Ephemeral);
}
}
std::string getPath() const
@ -656,19 +663,32 @@ public:
return path;
}
bool isCreated() const
{
return created;
}
static Ptr create(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
{
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, false, data);
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, false, false, data);
}
static Ptr tryCreate(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
{
auto node = std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, true, false, data);
if (node->isCreated())
return node;
return nullptr;
}
static Ptr createSequential(const std::string & path, ZooKeeper & zookeeper, const std::string & data = "")
{
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, true, data);
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, true, false, true, data);
}
static Ptr existing(const std::string & path, ZooKeeper & zookeeper)
{
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, false, "");
return std::make_shared<EphemeralNodeHolder>(path, zookeeper, false, false, false, "");
}
void setAlreadyRemoved()
@ -702,6 +722,7 @@ private:
ZooKeeper & zookeeper;
CurrentMetrics::Increment metric_increment{CurrentMetrics::EphemeralNode};
bool need_remove = true;
bool created = false;
};
using EphemeralNodeHolderPtr = EphemeralNodeHolder::Ptr;

View File

@ -108,6 +108,7 @@ class IColumn;
M(UInt64, s3_http_connection_pool_size, 1000, "How many reusable open connections to keep per S3 endpoint. Only applies to the S3 table engine and table function, not to S3 disks (for disks, use disk config instead). Global setting, can only be set in config, overriding it per session or per query has no effect.", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(String, s3queue_default_zookeeper_path, "/clickhouse/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \
M(Bool, s3queue_enable_logging_to_s3queue_log, false, "Enable writing to system.s3queue_log. The value can be overwritten per table with table settings", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \

View File

@ -3588,6 +3588,15 @@ std::shared_ptr<FilesystemCacheLog> Context::getFilesystemCacheLog() const
return shared->system_logs->filesystem_cache_log;
}
std::shared_ptr<S3QueueLog> Context::getS3QueueLog() const
{
auto lock = getGlobalSharedLock();
if (!shared->system_logs)
return {};
return shared->system_logs->s3_queue_log;
}
std::shared_ptr<FilesystemReadPrefetchesLog> Context::getFilesystemReadPrefetchesLog() const
{
auto lock = getGlobalSharedLock();

View File

@ -105,6 +105,7 @@ class TransactionsInfoLog;
class ProcessorsProfileLog;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class S3QueueLog;
class AsynchronousInsertLog;
class BackupLog;
class IAsynchronousReader;
@ -1041,6 +1042,7 @@ public:
std::shared_ptr<TransactionsInfoLog> getTransactionsInfoLog() const;
std::shared_ptr<ProcessorsProfileLog> getProcessorsProfileLog() const;
std::shared_ptr<FilesystemCacheLog> getFilesystemCacheLog() const;
std::shared_ptr<S3QueueLog> getS3QueueLog() const;
std::shared_ptr<FilesystemReadPrefetchesLog> getFilesystemReadPrefetchesLog() const;
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
std::shared_ptr<BackupLog> getBackupLog() const;

View File

@ -0,0 +1,62 @@
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeMap.h>
#include <Interpreters/ProfileEventsExt.h>
#include <DataTypes/DataTypeEnum.h>
#include <Interpreters/S3QueueLog.h>
namespace DB
{
NamesAndTypesList S3QueueLogElement::getNamesAndTypes()
{
auto status_datatype = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"Processed", static_cast<Int8>(S3QueueLogElement::S3QueueStatus::Processed)},
{"Failed", static_cast<Int8>(S3QueueLogElement::S3QueueStatus::Failed)},
});
return {
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
{"table_uuid", std::make_shared<DataTypeString>()},
{"file_name", std::make_shared<DataTypeString>()},
{"rows_processed", std::make_shared<DataTypeUInt64>()},
{"status", status_datatype},
{"processing_start_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())},
{"processing_end_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>())},
{"exception", std::make_shared<DataTypeString>()},
};
}
void S3QueueLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(table_uuid);
columns[i++]->insert(file_name);
columns[i++]->insert(rows_processed);
columns[i++]->insert(status);
if (processing_start_time)
columns[i++]->insert(processing_start_time);
else
columns[i++]->insertDefault();
if (processing_end_time)
columns[i++]->insert(processing_end_time);
else
columns[i++]->insertDefault();
ProfileEvents::dumpToMapColumn(counters_snapshot, columns[i++].get(), true);
columns[i++]->insert(exception);
}
}

View File

@ -0,0 +1,43 @@
#pragma once
#include <Common/ProfileEvents.h>
#include <Core/NamesAndAliases.h>
#include <Core/NamesAndTypes.h>
#include <Interpreters/SystemLog.h>
namespace DB
{
struct S3QueueLogElement
{
time_t event_time{};
std::string table_uuid;
std::string file_name;
size_t rows_processed = 0;
enum class S3QueueStatus
{
Processed,
Failed,
};
S3QueueStatus status;
ProfileEvents::Counters::Snapshot counters_snapshot;
time_t processing_start_time;
time_t processing_end_time;
std::string exception;
static std::string name() { return "S3QueueLog"; }
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;
static const char * getCustomColumnList() { return nullptr; }
};
class S3QueueLog : public SystemLog<S3QueueLogElement>
{
using SystemLog<S3QueueLogElement>::SystemLog;
};
}

View File

@ -19,6 +19,7 @@
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/ZooKeeperLog.h>
#include <Interpreters/BackupLog.h>
#include <Parsers/ASTCreateQuery.h>
@ -289,6 +290,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
processors_profile_log = createSystemLog<ProcessorsProfileLog>(global_context, "system", "processors_profile_log", config, "processors_profile_log");
asynchronous_insert_log = createSystemLog<AsynchronousInsertLog>(global_context, "system", "asynchronous_insert_log", config, "asynchronous_insert_log");
backup_log = createSystemLog<BackupLog>(global_context, "system", "backup_log", config, "backup_log");
s3_queue_log = createSystemLog<S3QueueLog>(global_context, "system", "s3queue_log", config, "s3queue_log");
if (query_log)
logs.emplace_back(query_log.get());
@ -329,6 +331,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
logs.emplace_back(asynchronous_insert_log.get());
if (backup_log)
logs.emplace_back(backup_log.get());
if (s3_queue_log)
logs.emplace_back(s3_queue_log.get());
try
{

View File

@ -50,6 +50,7 @@ class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class AsynchronousInsertLog;
class BackupLog;
class S3QueueLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
/// because SystemLog destruction makes insert query while flushing data into underlying tables
@ -70,6 +71,7 @@ struct SystemLogs
std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics.
std::shared_ptr<FilesystemCacheLog> filesystem_cache_log;
std::shared_ptr<FilesystemReadPrefetchesLog> filesystem_read_prefetches_log;
std::shared_ptr<S3QueueLog> s3_queue_log;
/// Metrics from system.asynchronous_metrics.
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans.

File diff suppressed because it is too large Load Diff

View File

@ -1,127 +1,171 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <filesystem>
#include <Core/Types.h>
#include <Core/SettingsEnums.h>
#include <Core/BackgroundSchedulePool.h>
#include <Common/ZooKeeper/ZooKeeper.h>
# include <Core/UUID.h>
# include <Interpreters/Context.h>
# include <Storages/StorageS3Settings.h>
# include <Common/ZooKeeper/ZooKeeper.h>
namespace fs = std::filesystem;
namespace Poco { class Logger; }
namespace DB
{
class StorageS3Queue;
struct S3QueueSettings;
class StorageS3Queue;
/**
* A class for managing S3Queue metadata in zookeeper, e.g.
* the following folders:
* - <path_to_metadata>/processing
* - <path_to_metadata>/processed
* - <path_to_metadata>/failed
*
* Depending on S3Queue processing mode (ordered or unordered)
* we can differently store metadata in /processed node.
*
* Implements caching of zookeeper metadata for faster responses.
* Cached part is located in LocalFileStatuses.
*
* In case of Unordered mode - if files TTL is enabled or maximum tracked files limit is set
* starts a background cleanup thread which is responsible for maintaining them.
*/
class S3QueueFilesMetadata
{
public:
struct TrackedCollectionItem
class ProcessingNodeHolder;
using ProcessingNodeHolderPtr = std::shared_ptr<ProcessingNodeHolder>;
S3QueueFilesMetadata(const fs::path & zookeeper_path_, const S3QueueSettings & settings_);
~S3QueueFilesMetadata();
void setFileProcessed(ProcessingNodeHolderPtr holder);
void setFileFailed(ProcessingNodeHolderPtr holder, const std::string & exception_message);
struct FileStatus
{
TrackedCollectionItem() = default;
TrackedCollectionItem(const String & file_path_, UInt64 timestamp_, UInt64 retries_count_, const String & last_exception_)
: file_path(file_path_), timestamp(timestamp_), retries_count(retries_count_), last_exception(last_exception_) {}
String file_path;
UInt64 timestamp = 0;
UInt64 retries_count = 0;
String last_exception;
enum class State
{
Processing,
Processed,
Failed,
None
};
State state = State::None;
std::atomic<size_t> processed_rows = 0;
time_t processing_start_time = 0;
time_t processing_end_time = 0;
size_t retries = 0;
std::string last_exception;
ProfileEvents::Counters profile_counters;
std::mutex processing_lock;
std::mutex metadata_lock;
};
using FileStatusPtr = std::shared_ptr<FileStatus>;
using FileStatuses = std::unordered_map<std::string, FileStatusPtr>;
using S3FilesCollection = std::unordered_set<String>;
using TrackedFiles = std::deque<TrackedCollectionItem>;
/// Set file as processing, if it is not alreaty processed, failed or processing.
std::pair<ProcessingNodeHolderPtr, FileStatusPtr> trySetFileAsProcessing(const std::string & path);
S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_);
FileStatusPtr getFileStatus(const std::string & path);
void setFilesProcessing(const Strings & file_paths);
void setFileProcessed(const String & file_path);
bool setFileFailed(const String & file_path, const String & exception_message);
FileStatuses getFileStateses() const { return local_file_statuses.getAll(); }
S3FilesCollection getProcessedFailedAndProcessingFiles();
String getMaxProcessedFile();
std::shared_ptr<zkutil::EphemeralNodeHolder> acquireLock(zkutil::ZooKeeperPtr zookeeper);
bool checkSettings(const S3QueueSettings & settings) const;
struct S3QueueCollection
{
public:
virtual ~S3QueueCollection() = default;
virtual String toString() const;
S3FilesCollection getFileNames();
virtual void parse(const String & collection_str) = 0;
protected:
TrackedFiles files;
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;
};
struct S3QueueProcessedCollection : public S3QueueCollection
{
public:
S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_);
void parse(const String & collection_str) override;
void add(const String & file_name);
private:
const UInt64 max_size;
const UInt64 max_age;
};
struct S3QueueFailedCollection : S3QueueCollection
{
public:
S3QueueFailedCollection(const UInt64 & max_retries_count_);
void parse(const String & collection_str) override;
bool add(const String & file_name, const String & exception_message);
S3FilesCollection getFileNames();
private:
UInt64 max_retries_count;
};
struct S3QueueProcessingCollection
{
public:
S3QueueProcessingCollection() = default;
void parse(const String & collection_str);
void add(const Strings & file_names);
void remove(const String & file_name);
String toString() const;
const S3FilesCollection & getFileNames() const { return files; }
private:
S3FilesCollection files;
};
void deactivateCleanupTask();
private:
const StorageS3Queue * storage;
const S3QueueMode mode;
const UInt64 max_set_size;
const UInt64 max_set_age_sec;
const UInt64 max_loading_retries;
const size_t min_cleanup_interval_ms;
const size_t max_cleanup_interval_ms;
const String zookeeper_processing_path;
const String zookeeper_processed_path;
const String zookeeper_failed_path;
const String zookeeper_lock_path;
const fs::path zookeeper_processing_path;
const fs::path zookeeper_processed_path;
const fs::path zookeeper_failed_path;
const fs::path zookeeper_cleanup_lock_path;
mutable std::mutex mutex;
Poco::Logger * log;
S3FilesCollection getFailedFiles();
S3FilesCollection getProcessingFiles();
S3FilesCollection getUnorderedProcessedFiles();
std::atomic_bool shutdown = false;
BackgroundSchedulePool::TaskHolder task;
void removeProcessingFile(const String & file_path);
std::string getNodeName(const std::string & path);
zkutil::ZooKeeperPtr getZooKeeper() const;
void setFileProcessedForOrderedMode(ProcessingNodeHolderPtr holder);
void setFileProcessedForUnorderedMode(ProcessingNodeHolderPtr holder);
enum class SetFileProcessingResult
{
Success,
ProcessingByOtherNode,
AlreadyProcessed,
AlreadyFailed,
};
std::pair<SetFileProcessingResult, ProcessingNodeHolderPtr> trySetFileAsProcessingForOrderedMode(const std::string & path);
std::pair<SetFileProcessingResult, ProcessingNodeHolderPtr> trySetFileAsProcessingForUnorderedMode(const std::string & path);
struct NodeMetadata
{
std::string file_path;
UInt64 last_processed_timestamp = 0;
std::string last_exception;
UInt64 retries = 0;
std::string processing_id; /// For ephemeral processing node.
std::string toString() const;
static NodeMetadata fromString(const std::string & metadata_str);
};
NodeMetadata createNodeMetadata(const std::string & path, const std::string & exception = "", size_t retries = 0);
void cleanupThreadFunc();
void cleanupThreadFuncImpl();
struct LocalFileStatuses
{
FileStatuses file_statuses;
mutable std::mutex mutex;
FileStatuses getAll() const;
FileStatusPtr get(const std::string & filename, bool create);
bool remove(const std::string & filename, bool if_exists);
std::unique_lock<std::mutex> lock() const;
};
LocalFileStatuses local_file_statuses;
};
class S3QueueFilesMetadata::ProcessingNodeHolder
{
friend class S3QueueFilesMetadata;
public:
ProcessingNodeHolder(
const std::string & processing_id_,
const std::string & path_,
const std::string & zk_node_path_,
zkutil::ZooKeeperPtr zk_client_);
~ProcessingNodeHolder();
private:
bool remove(Coordination::Requests * requests = nullptr, Coordination::Responses * responses = nullptr);
zkutil::ZooKeeperPtr zk_client;
std::string path;
std::string zk_node_path;
std::string processing_id;
bool removed = false;
Poco::Logger * log;
};
}
#endif

View File

@ -0,0 +1,70 @@
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
S3QueueMetadataFactory & S3QueueMetadataFactory::instance()
{
static S3QueueMetadataFactory ret;
return ret;
}
S3QueueMetadataFactory::FilesMetadataPtr
S3QueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const S3QueueSettings & settings)
{
std::lock_guard lock(mutex);
auto it = metadata_by_path.find(zookeeper_path);
if (it == metadata_by_path.end())
{
it = metadata_by_path.emplace(zookeeper_path, std::make_shared<S3QueueFilesMetadata>(fs::path(zookeeper_path), settings)).first;
}
else if (it->second.metadata->checkSettings(settings))
{
it->second.ref_count += 1;
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata with the same `s3queue_zookeeper_path` "
"was already created but with different settings");
}
return it->second.metadata;
}
void S3QueueMetadataFactory::remove(const std::string & zookeeper_path)
{
std::lock_guard lock(mutex);
auto it = metadata_by_path.find(zookeeper_path);
if (it == metadata_by_path.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Metadata with zookeeper path {} does not exist", zookeeper_path);
if (--it->second.ref_count == 0)
{
try
{
auto zk_client = Context::getGlobalContextInstance()->getZooKeeper();
zk_client->tryRemove(it->first);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
metadata_by_path.erase(it);
}
}
std::unordered_map<std::string, S3QueueMetadataFactory::FilesMetadataPtr> S3QueueMetadataFactory::getAll()
{
std::unordered_map<std::string, S3QueueMetadataFactory::FilesMetadataPtr> result;
for (const auto & [zk_path, metadata_and_ref_count] : metadata_by_path)
result.emplace(zk_path, metadata_and_ref_count.metadata);
return result;
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
namespace DB
{
class S3QueueMetadataFactory final : private boost::noncopyable
{
public:
using FilesMetadataPtr = std::shared_ptr<S3QueueFilesMetadata>;
static S3QueueMetadataFactory & instance();
FilesMetadataPtr getOrCreate(const std::string & zookeeper_path, const S3QueueSettings & settings);
void remove(const std::string & zookeeper_path);
std::unordered_map<std::string, FilesMetadataPtr> getAll();
private:
struct Metadata
{
explicit Metadata(std::shared_ptr<S3QueueFilesMetadata> metadata_) : metadata(metadata_), ref_count(1) {}
std::shared_ptr<S3QueueFilesMetadata> metadata;
size_t ref_count = 0;
};
using MetadataByPath = std::unordered_map<std::string, Metadata>;
MetadataByPath metadata_by_path;
std::mutex mutex;
};
}

View File

@ -1,8 +1,8 @@
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Common/Exception.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Common/Exception.h>
namespace DB

View File

@ -19,17 +19,16 @@ class ASTStorage;
0) \
M(S3QueueAction, after_processing, S3QueueAction::KEEP, "Delete or keep file in S3 after successful processing", 0) \
M(String, keeper_path, "", "Zookeeper node path", 0) \
M(UInt64, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt64, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
M(UInt64, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
M(UInt64, s3queue_polling_backoff_ms, 0, "Polling backoff", 0) \
M(UInt64, s3queue_tracked_files_limit, 1000, "Max set size for tracking processed files in unordered mode in ZooKeeper", 0) \
M(UInt64, \
s3queue_tracked_file_ttl_sec, \
0, \
"Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", \
0) \
M(UInt64, s3queue_polling_size, 50, "Maximum files to fetch from S3 with SELECT", 0)
M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \
M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
M(UInt32, s3queue_polling_backoff_ms, 1000, "Polling backoff", 0) \
M(UInt32, s3queue_tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
M(UInt32, s3queue_cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
M(UInt32, s3queue_cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
#define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \
S3QUEUE_RELATED_SETTINGS(M, ALIAS) \

View File

@ -1,59 +1,24 @@
#include <algorithm>
#include <Common/ProfileEvents.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include "IO/ParallelReadBuffer.h"
#include "Parsers/ASTCreateQuery.h"
#include "config.h"
#if USE_AWS_S3
# include <Common/isValidUTF8.h>
# include <Functions/FunctionsConversion.h>
# include <IO/S3/Requests.h>
# include <IO/S3Common.h>
# include <Interpreters/TreeRewriter.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ASTInsertQuery.h>
# include <Storages/NamedCollectionsHelpers.h>
# include <Storages/PartitionedSink.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/VirtualColumnUtils.h>
# include <Formats/FormatFactory.h>
# include <Processors/Formats/IInputFormat.h>
# include <Processors/Formats/IOutputFormat.h>
# include <Processors/Transforms/AddingDefaultsTransform.h>
# include <QueryPipeline/QueryPipelineBuilder.h>
# include <DataTypes/DataTypeString.h>
# include <Common/CurrentMetrics.h>
# include <Common/NamedCollections/NamedCollections.h>
# include <Common/parseGlobs.h>
# include <Processors/ISource.h>
# include <Processors/Sinks/SinkToStorage.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/logger_useful.h>
#include <Common/getRandomASCIIString.h>
#include <Storages/S3Queue/S3QueueSource.h>
#include <Storages/VirtualColumnUtils.h>
namespace CurrentMetrics
{
extern const Metric StorageS3Threads;
extern const Metric StorageS3ThreadsActive;
extern const Metric StorageS3Threads;
extern const Metric StorageS3ThreadsActive;
}
namespace ProfileEvents
{
extern const Event S3DeleteObjects;
extern const Event S3ListObjects;
extern const Event S3QueuePullMicroseconds;
}
namespace DB
@ -62,148 +27,83 @@ namespace DB
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int NOT_IMPLEMENTED;
}
StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_)
: max_poll_size(max_poll_size_)
, glob_iterator(std::make_unique<StorageS3QueueSource::DisclosedGlobIterator>(
client_, globbed_uri_, query, virtual_columns, context, nullptr, request_settings_))
StorageS3QueueSource::S3QueueKeyWithInfo::S3QueueKeyWithInfo(
const std::string & key_,
std::optional<S3::ObjectInfo> info_,
Metadata::ProcessingNodeHolderPtr processing_holder_,
FileStatusPtr file_status_)
: StorageS3Source::KeyWithInfo(key_, info_)
, processing_holder(processing_holder_)
, file_status(file_status_)
{
/// todo(kssenii): remove this loop, it should not be here
while (true)
{
KeyWithInfo val = glob_iterator->next();
if (val.key.empty())
break;
keys_buf.push_back(val);
}
}
Strings StorageS3QueueSource::QueueGlobIterator::filterProcessingFiles(
const S3QueueMode & engine_mode, std::unordered_set<String> & exclude_keys, const String & max_file)
StorageS3QueueSource::FileIterator::FileIterator(
std::shared_ptr<S3QueueFilesMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_)
: metadata(metadata_)
, glob_iterator(std::move(glob_iterator_))
, shutdown_called(shutdown_called_)
{
for (const KeyWithInfo & val : keys_buf)
{
auto full_path = val.key;
if (exclude_keys.find(full_path) != exclude_keys.end())
{
LOG_TEST(log, "File {} will be skipped, because it was found in exclude files list "
"(either already processed or failed to be processed)", val.key);
continue;
}
if ((engine_mode == S3QueueMode::ORDERED) && (full_path.compare(max_file) <= 0))
continue;
if ((processing_keys.size() < max_poll_size) || (engine_mode == S3QueueMode::ORDERED))
{
processing_keys.push_back(val);
}
else
{
break;
}
}
if (engine_mode == S3QueueMode::ORDERED)
{
std::sort(
processing_keys.begin(),
processing_keys.end(),
[](const KeyWithInfo & lhs, const KeyWithInfo & rhs) { return lhs.key.compare(rhs.key) < 0; });
if (processing_keys.size() > max_poll_size)
{
processing_keys.erase(processing_keys.begin() + max_poll_size, processing_keys.end());
}
}
Strings keys;
for (const auto & key_info : processing_keys)
keys.push_back(key_info.key);
processing_keys.push_back(KeyWithInfo());
processing_iterator = processing_keys.begin();
return keys;
}
StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::QueueGlobIterator::next()
StorageS3QueueSource::KeyWithInfoPtr StorageS3QueueSource::FileIterator::next()
{
std::lock_guard lock(mutex);
if (processing_iterator != processing_keys.end())
while (!shutdown_called)
{
return *processing_iterator++;
}
KeyWithInfoPtr val = glob_iterator->next();
return KeyWithInfo();
if (!val || shutdown_called)
return {};
if (auto [processing_holder, processing_file_status] = metadata->trySetFileAsProcessing(val->key);
processing_holder && !shutdown_called)
{
return std::make_shared<S3QueueKeyWithInfo>(val->key, val->info, processing_holder, processing_file_status);
}
}
return {};
}
size_t StorageS3QueueSource::QueueGlobIterator::estimatedKeysCount()
size_t StorageS3QueueSource::FileIterator::estimatedKeysCount()
{
return keys_buf.size();
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method estimateKeysCount is not implemented");
}
StorageS3QueueSource::StorageS3QueueSource(
const ReadFromFormatInfo & info,
const String & format_,
String name_,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const S3::Client> & client_,
const String & bucket_,
const String & version_id_,
const String & url_host_and_port,
std::shared_ptr<IIterator> file_iterator_,
const Block & header_,
std::unique_ptr<StorageS3Source> internal_source_,
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
const S3QueueAction & action_,
const size_t download_thread_num_)
: ISource(info.source_header)
RemoveFileFunc remove_file_func_,
const NamesAndTypesList & requested_virtual_columns_,
ContextPtr context_,
const std::atomic<bool> & shutdown_called_,
std::shared_ptr<S3QueueLog> s3_queue_log_,
const StorageID & storage_id_)
: ISource(header_)
, WithContext(context_)
, name(std::move(name_))
, bucket(bucket_)
, version_id(version_id_)
, format(format_)
, columns_desc(info.columns_description)
, request_settings(request_settings_)
, client(client_)
, files_metadata(files_metadata_)
, requested_virtual_columns(info.requested_virtual_columns)
, requested_columns(info.requested_columns)
, file_iterator(file_iterator_)
, action(action_)
, files_metadata(files_metadata_)
, internal_source(std::move(internal_source_))
, requested_virtual_columns(requested_virtual_columns_)
, shutdown_called(shutdown_called_)
, s3_queue_log(s3_queue_log_)
, storage_id(storage_id_)
, remove_file_func(remove_file_func_)
, log(&Poco::Logger::get("StorageS3QueueSource"))
{
internal_source = std::make_shared<StorageS3Source>(
info,
format_,
name_,
context_,
format_settings_,
max_block_size_,
request_settings_,
compression_hint_,
client_,
bucket_,
version_id_,
url_host_and_port,
file_iterator,
download_thread_num_,
false,
/* query_info */ std::nullopt);
reader = std::move(internal_source->reader);
if (reader)
{
reader_future = std::move(internal_source->reader_future);
}
}
StorageS3QueueSource::~StorageS3QueueSource()
@ -218,61 +118,87 @@ String StorageS3QueueSource::getName() const
Chunk StorageS3QueueSource::generate()
{
auto file_progress = getContext()->getFileProgressCallback();
while (true)
{
if (isCancelled() || !reader)
if (!reader)
break;
if (isCancelled())
{
if (reader)
reader->cancel();
reader->cancel();
break;
}
Chunk chunk;
bool success_in_pulling = false;
if (shutdown_called)
{
if (processed_rows_from_file)
{
/// We could delay shutdown until files, which already started processing before the shutdown, finished.
/// But if files are big and `s3queue_processing_threads_num` is not small, it can take a significant time.
/// Anyway we cannot do anything in case of SIGTERM, so destination table must anyway support deduplication,
/// so here we will rely on it here as well.
LOG_WARNING(
log, "Shutdown called, {} rows are already processed, but file is not fully processed",
processed_rows_from_file);
}
break;
}
const auto * key_with_info = dynamic_cast<const S3QueueKeyWithInfo *>(&reader.getKeyWithInfo());
auto file_status = key_with_info->file_status;
auto * prev_scope = CurrentThread::get().attachProfileCountersScope(&file_status->profile_counters);
SCOPE_EXIT({ CurrentThread::get().attachProfileCountersScope(prev_scope); });
/// FIXME: if files are compressed, profile counters update does not work fully (s3 related counters are not saved). Why?
try
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueuePullMicroseconds);
Chunk chunk;
if (reader->pull(chunk))
{
UInt64 num_rows = chunk.getNumRows();
auto file_path = reader.getPath();
LOG_TEST(log, "Read {} rows from file: {}", chunk.getNumRows(), reader.getPath());
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = file_path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
}
}
success_in_pulling = true;
file_status->processed_rows += chunk.getNumRows();
processed_rows_from_file += chunk.getNumRows();
VirtualColumnUtils::addRequestedPathAndFileVirtualsToChunk(chunk, requested_virtual_columns, reader.getPath());
return chunk;
}
}
catch (const Exception & e)
catch (...)
{
LOG_ERROR(log, "Exception in chunk pulling: {} ", e.displayText());
files_metadata->setFileFailed(reader.getFile(), e.message());
success_in_pulling = false;
}
if (success_in_pulling)
{
applyActionAfterProcessing(reader.getFile());
files_metadata->setFileProcessed(reader.getFile());
return chunk;
const auto message = getCurrentExceptionMessage(true);
LOG_ERROR(log, "Got an error while pulling chunk. Will set file {} as failed. Error: {} ", reader.getFile(), message);
files_metadata->setFileFailed(key_with_info->processing_holder, message);
appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, false);
throw;
}
files_metadata->setFileProcessed(key_with_info->processing_holder);
applyActionAfterProcessing(reader.getFile());
assert(reader_future.valid());
appendLogElement(reader.getFile(), *file_status, processed_rows_from_file, true);
file_status.reset();
processed_rows_from_file = 0;
if (shutdown_called)
{
LOG_INFO(log, "Shutdown was called, stopping sync");
break;
}
chassert(reader_future.valid());
reader = reader_future.get();
if (!reader)
break;
file_status = files_metadata->getFileStatus(reader.getFile());
/// Even if task is finished the thread may be not freed in pool.
/// So wait until it will be freed before scheduling a new task.
internal_source->create_reader_pool.wait();
@ -282,35 +208,42 @@ Chunk StorageS3QueueSource::generate()
return {};
}
void StorageS3QueueSource::applyActionAfterProcessing(const String & file_path)
void StorageS3QueueSource::applyActionAfterProcessing(const String & path)
{
switch (action)
{
case S3QueueAction::DELETE:
deleteProcessedObject(file_path);
{
assert(remove_file_func);
remove_file_func(path);
break;
}
case S3QueueAction::KEEP:
break;
}
}
void StorageS3QueueSource::deleteProcessedObject(const String & file_path)
void StorageS3QueueSource::appendLogElement(const std::string & filename, S3QueueFilesMetadata::FileStatus & file_status_, size_t processed_rows, bool processed)
{
LOG_INFO(log, "Delete processed file {} from bucket {}", file_path, bucket);
if (!s3_queue_log)
return;
S3::DeleteObjectRequest request;
request.WithKey(file_path).WithBucket(bucket);
auto outcome = client->DeleteObject(request);
if (!outcome.IsSuccess())
S3QueueLogElement elem{};
{
const auto & err = outcome.GetError();
LOG_ERROR(log, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
}
else
{
LOG_TRACE(log, "Object with path {} was removed from S3", file_path);
std::lock_guard lock(file_status_.metadata_lock);
elem = S3QueueLogElement
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.file_name = filename,
.rows_processed = processed_rows,
.status = processed ? S3QueueLogElement::S3QueueStatus::Processed : S3QueueLogElement::S3QueueStatus::Failed,
.counters_snapshot = file_status_.profile_counters.getPartiallyAtomicSnapshot(),
.processing_start_time = file_status_.processing_start_time,
.processing_end_time = file_status_.processing_end_time,
.exception = file_status_.last_exception,
};
}
s3_queue_log->add(std::move(elem));
}
}

View File

@ -2,125 +2,101 @@
#include "config.h"
#if USE_AWS_S3
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Processors/ISource.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
#include <Storages/StorageS3.h>
#include <Interpreters/S3QueueLog.h>
# include <Core/Types.h>
# include <Compression/CompressionInfo.h>
# include <Storages/IStorage.h>
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/prepareReadingFromFormat.h>
# include <IO/CompressionMethod.h>
# include <IO/S3/getObjectInfo.h>
# include <Interpreters/Context.h>
# include <Interpreters/threadPoolCallbackRunner.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/ISource.h>
# include <Storages/Cache/SchemaCache.h>
# include <Storages/StorageConfiguration.h>
# include <Poco/URI.h>
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Common/logger_useful.h>
namespace Poco { class Logger; }
namespace DB
{
class StorageS3QueueSource : public ISource, WithContext
{
public:
using IIterator = StorageS3Source::IIterator;
using DisclosedGlobIterator = StorageS3Source::DisclosedGlobIterator;
using KeysWithInfo = StorageS3Source::KeysWithInfo;
using KeyWithInfo = StorageS3Source::KeyWithInfo;
class QueueGlobIterator : public IIterator
using KeyWithInfoPtr = StorageS3Source::KeyWithInfoPtr;
using GlobIterator = StorageS3Source::DisclosedGlobIterator;
using ZooKeeperGetter = std::function<zkutil::ZooKeeperPtr()>;
using RemoveFileFunc = std::function<void(std::string)>;
using FileStatusPtr = S3QueueFilesMetadata::FileStatusPtr;
using Metadata = S3QueueFilesMetadata;
struct S3QueueKeyWithInfo : public StorageS3Source::KeyWithInfo
{
S3QueueKeyWithInfo(
const std::string & key_,
std::optional<S3::ObjectInfo> info_,
Metadata::ProcessingNodeHolderPtr processing_holder_,
FileStatusPtr file_status_);
Metadata::ProcessingNodeHolderPtr processing_holder;
FileStatusPtr file_status;
};
class FileIterator : public IIterator
{
public:
QueueGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const NamesAndTypesList & virtual_columns,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_ = {});
FileIterator(std::shared_ptr<S3QueueFilesMetadata> metadata_, std::unique_ptr<GlobIterator> glob_iterator_, std::atomic<bool> & shutdown_called_);
KeyWithInfo next() override;
Strings
filterProcessingFiles(const S3QueueMode & engine_mode, std::unordered_set<String> & exclude_keys, const String & max_file = "");
/// Note:
/// List results in s3 are always returned in UTF-8 binary order.
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
KeyWithInfoPtr next() override;
size_t estimatedKeysCount() override;
private:
UInt64 max_poll_size;
KeysWithInfo keys_buf;
KeysWithInfo processing_keys;
mutable std::mutex mutex;
std::unique_ptr<DisclosedGlobIterator> glob_iterator;
std::vector<KeyWithInfo>::iterator processing_iterator;
Poco::Logger * log = &Poco::Logger::get("StorageS3QueueSourceIterator");
const std::shared_ptr<S3QueueFilesMetadata> metadata;
const std::unique_ptr<GlobIterator> glob_iterator;
std::atomic<bool> & shutdown_called;
std::mutex mutex;
};
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
StorageS3QueueSource(
const ReadFromFormatInfo & info,
const String & format,
String name_,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const S3::Client> & client_,
const String & bucket,
const String & version_id,
const String & url_host_and_port,
std::shared_ptr<IIterator> file_iterator_,
const Block & header_,
std::unique_ptr<StorageS3Source> internal_source_,
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
const S3QueueAction & action_,
size_t download_thread_num);
RemoveFileFunc remove_file_func_,
const NamesAndTypesList & requested_virtual_columns_,
ContextPtr context_,
const std::atomic<bool> & shutdown_called_,
std::shared_ptr<S3QueueLog> s3_queue_log_,
const StorageID & storage_id_);
~StorageS3QueueSource() override;
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
String getName() const override;
Chunk generate() override;
private:
String name;
String bucket;
String version_id;
String format;
ColumnsDescription columns_desc;
S3Settings::RequestSettings request_settings;
std::shared_ptr<const S3::Client> client;
const String name;
const S3QueueAction action;
const std::shared_ptr<S3QueueFilesMetadata> files_metadata;
const std::shared_ptr<StorageS3Source> internal_source;
const NamesAndTypesList requested_virtual_columns;
const std::atomic<bool> & shutdown_called;
const std::shared_ptr<S3QueueLog> s3_queue_log;
const StorageID storage_id;
RemoveFileFunc remove_file_func;
Poco::Logger * log;
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
using ReaderHolder = StorageS3Source::ReaderHolder;
ReaderHolder reader;
NamesAndTypesList requested_virtual_columns;
NamesAndTypesList requested_columns;
std::shared_ptr<IIterator> file_iterator;
const S3QueueAction action;
Poco::Logger * log = &Poco::Logger::get("StorageS3QueueSource");
std::future<ReaderHolder> reader_future;
size_t processed_rows_from_file = 0;
mutable std::mutex mutex;
std::shared_ptr<StorageS3Source> internal_source;
void deleteProcessedObject(const String & file_path);
void applyActionAfterProcessing(const String & file_path);
void applyActionAfterProcessing(const String & path);
void appendLogElement(const std::string & filename, S3QueueFilesMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
};
}

View File

@ -2,12 +2,12 @@
#if USE_AWS_S3
# include <Poco/JSON/JSON.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
# include <Storages/S3Queue/S3QueueSettings.h>
# include <Storages/S3Queue/S3QueueTableMetadata.h>
# include <Storages/StorageS3.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <Storages/StorageS3.h>
namespace DB
@ -18,13 +18,17 @@ namespace ErrorCodes
extern const int METADATA_MISMATCH;
}
S3QueueTableMetadata::S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings)
S3QueueTableMetadata::S3QueueTableMetadata(
const StorageS3::Configuration & configuration,
const S3QueueSettings & engine_settings,
const StorageInMemoryMetadata & storage_metadata)
{
format_name = configuration.format;
after_processing = engine_settings.after_processing.toString();
mode = engine_settings.mode.toString();
s3queue_tracked_files_limit = engine_settings.s3queue_tracked_files_limit;
s3queue_tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec;
columns = storage_metadata.getColumns().toString();
}
@ -36,6 +40,7 @@ String S3QueueTableMetadata::toString() const
json.set("s3queue_tracked_files_limit", s3queue_tracked_files_limit);
json.set("s3queue_tracked_file_ttl_sec", s3queue_tracked_file_ttl_sec);
json.set("format_name", format_name);
json.set("columns", columns);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
@ -52,6 +57,7 @@ void S3QueueTableMetadata::read(const String & metadata_str)
s3queue_tracked_files_limit = json->getValue<UInt64>("s3queue_tracked_files_limit");
s3queue_tracked_file_ttl_sec = json->getValue<UInt64>("s3queue_tracked_file_ttl_sec");
format_name = json->getValue<String>("format_name");
columns = json->getValue<String>("columns");
}
S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str)

View File

@ -2,9 +2,9 @@
#if USE_AWS_S3
# include <Storages/S3Queue/S3QueueSettings.h>
# include <Storages/StorageS3.h>
# include <base/types.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/StorageS3.h>
#include <base/types.h>
namespace DB
{
@ -18,13 +18,14 @@ class ReadBuffer;
struct S3QueueTableMetadata
{
String format_name;
String columns;
String after_processing;
String mode;
UInt64 s3queue_tracked_files_limit;
UInt64 s3queue_tracked_file_ttl_sec;
S3QueueTableMetadata() = default;
S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings);
S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings, const StorageInMemoryMetadata & storage_metadata);
void read(const String & metadata_str);
static S3QueueTableMetadata parse(const String & metadata_str);

View File

@ -1,83 +1,102 @@
#include "config.h"
#if USE_AWS_S3
#include <Common/ProfileEvents.h>
#include <IO/S3Common.h>
#include <IO/CompressionMethod.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTInsertQuery.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <Storages/S3Queue/StorageS3Queue.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageSnapshot.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/prepareReadingFromFormat.h>
#include <filesystem>
# include <Databases/DatabaseReplicated.h>
# include <IO/WriteBuffer.h>
# include <IO/WriteHelpers.h>
# include <Interpreters/InterpreterInsertQuery.h>
# include <Processors/Executors/CompletedPipelineExecutor.h>
# include <Common/ProfileEvents.h>
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Common/isValidUTF8.h>
# include "IO/ParallelReadBuffer.h"
# include <Functions/FunctionsConversion.h>
# include <IO/S3Common.h>
# include <Interpreters/TreeRewriter.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ASTInsertQuery.h>
# include <Storages/NamedCollectionsHelpers.h>
# include <Storages/PartitionedSink.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/S3Queue/S3QueueTableMetadata.h>
# include <Storages/S3Queue/StorageS3Queue.h>
# include <Storages/StorageFactory.h>
# include <Storages/StorageMaterializedView.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageSnapshot.h>
# include <Storages/VirtualColumnUtils.h>
# include <Storages/prepareReadingFromFormat.h>
# include <Common/NamedCollections/NamedCollections.h>
# include <Formats/FormatFactory.h>
# include <Processors/Formats/IInputFormat.h>
# include <Processors/Formats/IOutputFormat.h>
# include <Processors/Transforms/AddingDefaultsTransform.h>
# include <QueryPipeline/QueryPipelineBuilder.h>
# include <DataTypes/DataTypeString.h>
# include <Common/parseGlobs.h>
# include <filesystem>
# include <Processors/ISource.h>
# include <Processors/Sinks/SinkToStorage.h>
# include <QueryPipeline/Pipe.h>
namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event S3DeleteObjects;
extern const Event S3ListObjects;
extern const Event S3DeleteObjects;
extern const Event S3ListObjects;
}
namespace DB
{
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int S3_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int QUERY_NOT_ALLOWED;
extern const int REPLICA_ALREADY_EXISTS;
extern const int INCOMPATIBLE_COLUMNS;
}
namespace
{
bool containsGlobs(const S3::URI & url)
{
return url.key.find_first_of("*?{") != std::string::npos;
}
std::string chooseZooKeeperPath(const StorageID & table_id, const Settings & settings, const S3QueueSettings & s3queue_settings)
{
std::string zk_path_prefix = settings.s3queue_default_zookeeper_path.value;
if (zk_path_prefix.empty())
zk_path_prefix = "/";
std::string result_zk_path;
if (s3queue_settings.keeper_path.changed)
{
/// We do not add table uuid here on purpose.
result_zk_path = fs::path(zk_path_prefix) / s3queue_settings.keeper_path.value;
}
else
{
auto database_uuid = DatabaseCatalog::instance().getDatabase(table_id.database_name)->getUUID();
result_zk_path = fs::path(zk_path_prefix) / toString(database_uuid) / toString(table_id.uuid);
}
return zkutil::extractZooKeeperPath(result_zk_path, true);
}
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings, Poco::Logger * log)
{
if (s3queue_settings.mode == S3QueueMode::ORDERED && s3queue_settings.s3queue_processing_threads_num > 1)
{
LOG_WARNING(log, "Parallel processing is not yet supported for Ordered mode");
s3queue_settings.s3queue_processing_threads_num = 1;
}
if (!s3queue_settings.s3queue_processing_threads_num)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
}
if (!s3queue_settings.s3queue_enable_logging_to_s3queue_log.changed)
{
s3queue_settings.s3queue_enable_logging_to_s3queue_log = settings.s3queue_enable_logging_to_s3queue_log;
}
if (s3queue_settings.s3queue_cleanup_interval_min_ms > s3queue_settings.s3queue_cleanup_interval_max_ms)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Setting `s3queue_cleanup_interval_min_ms` ({}) must be less or equal to `s3queue_cleanup_interval_max_ms` ({})",
s3queue_settings.s3queue_cleanup_interval_min_ms, s3queue_settings.s3queue_cleanup_interval_max_ms);
}
}
}
StorageS3Queue::StorageS3Queue(
std::unique_ptr<S3QueueSettings> s3queue_settings_,
@ -87,79 +106,80 @@ StorageS3Queue::StorageS3Queue(
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
ASTPtr partition_by_)
std::optional<FormatSettings> format_settings_)
: IStorage(table_id_)
, WithContext(context_)
, s3queue_settings(std::move(s3queue_settings_))
, zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *s3queue_settings))
, after_processing(s3queue_settings->after_processing)
, files_metadata(S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings))
, configuration{configuration_}
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
, format_settings(format_settings_)
, partition_by(partition_by_)
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
, log(&Poco::Logger::get("StorageS3Queue (" + table_id_.table_name + ")"))
{
if (configuration.url.key.ends_with('/'))
{
configuration.url.key += '*';
if (!withGlobs())
}
else if (!containsGlobs(configuration.url))
{
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
std::string zk_path_prefix = getContext()->getSettingsRef().s3queue_default_zookeeper_path.value;
if (zk_path_prefix.empty())
zk_path_prefix = "/";
std::string result_zk_path;
if (s3queue_settings->keeper_path.changed)
{
/// We do not add table uuid here on purpose.
result_zk_path = fs::path(zk_path_prefix) / s3queue_settings->keeper_path.value;
}
else
{
auto database_uuid = DatabaseCatalog::instance().getDatabase(table_id_.database_name)->getUUID();
result_zk_path = fs::path(zk_path_prefix) / toString(database_uuid) / toString(table_id_.uuid);
}
zk_path = zkutil::extractZooKeeperPath(result_zk_path, true/* check_starts_with_slash */, log);
LOG_INFO(log, "Using zookeeper path: {}", zk_path);
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), log);
FormatFactory::instance().checkFormatName(configuration.format);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.url.uri);
StorageInMemoryMetadata storage_metadata;
configuration.update(context_);
FormatFactory::instance().checkFormatName(configuration.format);
context_->getRemoteHostFilter().checkURL(configuration.url.uri);
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())
{
auto columns = StorageS3::getTableStructureFromDataImpl(configuration, format_settings, context_);
storage_metadata.setColumns(columns);
}
else
{
storage_metadata.setColumns(columns_);
}
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
createOrCheckMetadata(storage_metadata);
setInMemoryMetadata(storage_metadata);
auto metadata_snapshot = getInMemoryMetadataPtr();
const bool is_first_replica = createTableIfNotExists(metadata_snapshot);
if (!is_first_replica)
{
checkTableStructure(zk_path, metadata_snapshot);
}
files_metadata = std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings);
virtual_columns = VirtualColumnUtils::getPathAndFileVirtualsForStorage(storage_metadata.getSampleBlock().getNamesAndTypesList());
task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
auto poll_thread = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
task = std::make_shared<TaskContext>(std::move(poll_thread));
LOG_INFO(log, "Using zookeeper path: {}", zk_path.string());
}
bool StorageS3Queue::supportsSubcolumns() const
void StorageS3Queue::startup()
{
return true;
if (task)
task->activateAndSchedule();
}
void StorageS3Queue::shutdown()
{
shutdown_called = true;
if (task)
{
task->deactivate();
}
if (files_metadata)
{
files_metadata->deactivateCleanupTask();
files_metadata.reset();
}
}
void StorageS3Queue::drop()
{
S3QueueMetadataFactory::instance().remove(zk_path);
}
bool StorageS3Queue::supportsSubsetOfColumns(const ContextPtr & context_) const
@ -174,83 +194,70 @@ Pipe StorageS3Queue::read(
ContextPtr local_context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
size_t /* num_streams */)
size_t num_streams)
{
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(
ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
{
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. "
"To enable use setting `stream_like_engine_allow_direct_select`");
}
if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageS3Queue with attached materialized views");
{
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED,
"Cannot read from {} with attached materialized views", getName());
}
auto query_configuration = updateConfigurationAndGetCopy(local_context);
Pipes pipes;
const size_t adjusted_num_streams = std::min<size_t>(num_streams, s3queue_settings->s3queue_processing_threads_num);
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(local_context, query_info.query);
auto file_iterator = createFileIterator(local_context, query_info.query);
for (size_t i = 0; i < adjusted_num_streams; ++i)
pipes.emplace_back(createSource(file_iterator, column_names, storage_snapshot, max_block_size, local_context));
return Pipe::unitePipes(std::move(pipes));
}
std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
size_t max_block_size,
ContextPtr local_context)
{
auto configuration_snapshot = updateConfigurationAndGetCopy(local_context);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context), getVirtuals());
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
return Pipe(std::make_shared<StorageS3QueueSource>(
read_from_format_info,
configuration.format,
getName(),
local_context,
format_settings,
auto internal_source = std::make_unique<StorageS3Source>(
read_from_format_info, configuration.format, getName(), local_context, format_settings,
max_block_size,
query_configuration.request_settings,
configuration.compression_method,
query_configuration.client,
query_configuration.url.bucket,
query_configuration.url.version_id,
query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()),
iterator_wrapper,
files_metadata,
after_processing,
max_download_threads));
}
configuration_snapshot.request_settings,
configuration_snapshot.compression_method,
configuration_snapshot.client,
configuration_snapshot.url.bucket,
configuration_snapshot.url.version_id,
configuration_snapshot.url.uri.getHost() + std::to_string(configuration_snapshot.url.uri.getPort()),
file_iterator, local_context->getSettingsRef().max_download_threads, false, /* query_info */ std::nullopt);
SinkToStoragePtr StorageS3Queue::write(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, bool)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Write is not supported by storage {}", getName());
}
void StorageS3Queue::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName());
}
NamesAndTypesList StorageS3Queue::getVirtuals() const
{
return virtual_columns;
}
bool StorageS3Queue::supportsPartitionBy() const
{
return true;
}
void StorageS3Queue::startup()
{
if (task)
task->holder->activateAndSchedule();
}
void StorageS3Queue::shutdown()
{
shutdown_called = true;
if (task)
auto file_deleter = [this, bucket = configuration_snapshot.url.bucket, client = configuration_snapshot.client](const std::string & path)
{
task->stream_cancelled = true;
task->holder->deactivate();
}
}
size_t StorageS3Queue::getTableDependentCount() const
{
auto table_id = getStorageID();
// Check if at least one direct dependency is attached
return DatabaseCatalog::instance().getDependentViews(table_id).size();
S3::DeleteObjectRequest request;
request.WithKey(path).WithBucket(bucket);
auto outcome = client->DeleteObject(request);
if (!outcome.IsSuccess())
{
const auto & err = outcome.GetError();
LOG_ERROR(log, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
}
else
{
LOG_TRACE(log, "Object with path {} was removed from S3", path);
}
};
auto s3_queue_log = s3queue_settings->s3queue_enable_logging_to_s3queue_log ? local_context->getS3QueueLog() : nullptr;
return std::make_shared<StorageS3QueueSource>(
getName(), read_from_format_info.source_header, std::move(internal_source),
files_metadata, after_processing, file_deleter, read_from_format_info.requested_virtual_columns,
local_context, shutdown_called, s3_queue_log, getStorageID());
}
bool StorageS3Queue::hasDependencies(const StorageID & table_id)
@ -280,41 +287,35 @@ bool StorageS3Queue::hasDependencies(const StorageID & table_id)
void StorageS3Queue::threadFunc()
{
bool reschedule = true;
if (shutdown_called)
return;
try
{
auto table_id = getStorageID();
auto dependencies_count = getTableDependentCount();
const size_t dependencies_count = DatabaseCatalog::instance().getDependentViews(getStorageID()).size();
if (dependencies_count)
{
auto start_time = std::chrono::steady_clock::now();
mv_attached.store(true);
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!task->stream_cancelled)
SCOPE_EXIT({ mv_attached.store(false); });
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
if (streamToViews())
{
if (!hasDependencies(table_id))
{
/// For this case, we can not wait for watch thread to wake up
reschedule = true;
break;
}
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
streamToViews();
auto ts = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts - start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
{
LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
reschedule = true;
break;
}
/// Reset the reschedule interval.
reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms;
}
else
{
/// Increase the reschedule interval.
reschedule_processing_interval_ms += s3queue_settings->s3queue_polling_backoff_ms;
}
LOG_DEBUG(log, "Stopped streaming to {} attached views", dependencies_count);
}
else
{
LOG_TEST(log, "No attached dependencies");
}
}
catch (...)
@ -322,20 +323,14 @@ void StorageS3Queue::threadFunc()
tryLogCurrentException(__PRETTY_FUNCTION__);
}
mv_attached.store(false);
if (reschedule && !shutdown_called)
if (!shutdown_called)
{
LOG_TRACE(log, "Reschedule S3 Queue thread func.");
/// Reschedule with backoff.
if (reschedule_processing_interval_ms < s3queue_settings->s3queue_polling_max_timeout_ms)
reschedule_processing_interval_ms += s3queue_settings->s3queue_polling_backoff_ms;
task->holder->scheduleAfter(reschedule_processing_interval_ms);
LOG_TRACE(log, "Reschedule S3 Queue processing thread in {} ms", reschedule_processing_interval_ms);
task->scheduleAfter(reschedule_processing_interval_ms);
}
}
void StorageS3Queue::streamToViews()
bool StorageS3Queue::streamToViews()
{
auto table_id = getStorageID();
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
@ -348,8 +343,6 @@ void StorageS3Queue::streamToViews()
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id;
size_t block_size = 100;
auto s3queue_context = Context::createCopy(getContext());
s3queue_context->makeQueryContext();
auto query_configuration = updateConfigurationAndGetCopy(s3queue_context);
@ -358,40 +351,31 @@ void StorageS3Queue::streamToViews()
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true);
auto block_io = interpreter.execute();
auto column_names = block_io.pipeline.getHeader().getNames();
auto file_iterator = createFileIterator(s3queue_context, nullptr);
// Create a stream for each consumer and join them in a union stream
Pipes pipes;
pipes.reserve(s3queue_settings->s3queue_processing_threads_num);
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
{
auto source = createSource(
file_iterator, block_io.pipeline.getHeader().getNames(),
storage_snapshot, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(s3queue_context, nullptr);
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(getContext()), getVirtuals());
const size_t max_download_threads = s3queue_context->getSettingsRef().max_download_threads;
auto pipe = Pipe(std::make_shared<StorageS3QueueSource>(
read_from_format_info,
configuration.format,
getName(),
s3queue_context,
format_settings,
block_size,
query_configuration.request_settings,
configuration.compression_method,
query_configuration.client,
query_configuration.url.bucket,
query_configuration.url.version_id,
query_configuration.url.uri.getHost() + std::to_string(query_configuration.url.uri.getPort()),
iterator_wrapper,
files_metadata,
after_processing,
max_download_threads));
pipes.emplace_back(std::move(source));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
block_io.pipeline.complete(std::move(pipe));
block_io.pipeline.setNumThreads(s3queue_settings->s3queue_processing_threads_num);
block_io.pipeline.setConcurrencyControl(s3queue_context->getSettingsRef().use_concurrency_control);
std::atomic_size_t rows = 0;
{
block_io.pipeline.complete(std::move(pipe));
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
}
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
return rows > 0;
}
StorageS3Queue::Configuration StorageS3Queue::updateConfigurationAndGetCopy(ContextPtr local_context)
@ -402,58 +386,43 @@ StorageS3Queue::Configuration StorageS3Queue::updateConfigurationAndGetCopy(Cont
zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const
{
std::lock_guard lock{zk_mutex};
if (!zk_client || zk_client->expired())
{
zk_client = getContext()->getZooKeeper();
zk_client->sync(zk_path);
}
return zk_client;
return getContext()->getZooKeeper();
}
bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot)
void StorageS3Queue::createOrCheckMetadata(const StorageInMemoryMetadata & storage_metadata)
{
auto zookeeper = getZooKeeper();
zookeeper->createAncestors(zk_path);
for (size_t i = 0; i < zk_create_table_retries; ++i)
for (size_t i = 0; i < 1000; ++i)
{
Coordination::Requests ops;
bool is_first_replica = true;
if (zookeeper->exists(zk_path + "/metadata"))
Coordination::Requests requests;
if (zookeeper->exists(zk_path / "metadata"))
{
if (!zookeeper->exists(zk_path + "/processing"))
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zk_path);
is_first_replica = false;
checkTableStructure(zk_path, storage_metadata);
}
else
{
String metadata_str = S3QueueTableMetadata(configuration, *s3queue_settings).toString();
ops.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/failed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeCreateRequest(
zk_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/metadata", metadata_str, zkutil::CreateMode::Persistent));
std::string metadata = S3QueueTableMetadata(configuration, *s3queue_settings, storage_metadata).toString();
requests.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent));
requests.emplace_back(zkutil::makeCreateRequest(zk_path / "processed", "", zkutil::CreateMode::Persistent));
requests.emplace_back(zkutil::makeCreateRequest(zk_path / "failed", "", zkutil::CreateMode::Persistent));
requests.emplace_back(zkutil::makeCreateRequest(zk_path / "processing", "", zkutil::CreateMode::Persistent));
requests.emplace_back(zkutil::makeCreateRequest(zk_path / "metadata", metadata, zkutil::CreateMode::Persistent));
}
Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
auto code = zookeeper->tryMulti(requests, responses);
if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path);
LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path.string());
continue;
}
else if (code != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(code, ops, responses);
zkutil::KeeperMultiException::check(code, requests, responses);
}
return is_first_replica;
return;
}
throw Exception(
@ -463,24 +432,20 @@ bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_
}
/** Verify that list of columns and table settings match those specified in ZK (/metadata).
* If not, throw an exception.
*/
void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot)
void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const StorageInMemoryMetadata & storage_metadata)
{
// Verify that list of columns and table settings match those specified in ZK (/metadata).
// If not, throw an exception.
auto zookeeper = getZooKeeper();
S3QueueTableMetadata old_metadata(configuration, *s3queue_settings);
Coordination::Stat metadata_stat;
String metadata_str = zookeeper->get(fs::path(zookeeper_prefix) / "metadata", &metadata_stat);
String metadata_str = zookeeper->get(fs::path(zookeeper_prefix) / "metadata");
auto metadata_from_zk = S3QueueTableMetadata::parse(metadata_str);
S3QueueTableMetadata old_metadata(configuration, *s3queue_settings, storage_metadata);
old_metadata.checkEquals(metadata_from_zk);
Coordination::Stat columns_stat;
auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_prefix) / "columns", &columns_stat));
const ColumnsDescription & old_columns = metadata_snapshot->getColumns();
auto columns_from_zk = ColumnsDescription::parse(metadata_from_zk.columns);
const ColumnsDescription & old_columns = storage_metadata.getColumns();
if (columns_from_zk != old_columns)
{
throw Exception(
@ -492,45 +457,12 @@ void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const
}
}
std::shared_ptr<StorageS3QueueSource::IIterator>
StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
{
auto it = std::make_shared<StorageS3QueueSource::QueueGlobIterator>(
*configuration.client,
configuration.url,
query,
virtual_columns,
local_context,
s3queue_settings->s3queue_polling_size.value,
configuration.request_settings);
auto zookeeper = getZooKeeper();
auto lock = files_metadata->acquireLock(zookeeper);
S3QueueFilesMetadata::S3FilesCollection files_to_skip = files_metadata->getProcessedFailedAndProcessingFiles();
Strings files_to_process;
if (s3queue_settings->mode == S3QueueMode::UNORDERED)
{
files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip);
}
else
{
String max_processed_file = files_metadata->getMaxProcessedFile();
files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip, max_processed_file);
}
LOG_TEST(log, "Found files to process: {}", fmt::join(files_to_process, ", "));
files_metadata->setFilesProcessing(files_to_process);
return it;
}
void StorageS3Queue::drop()
{
auto zookeeper = getZooKeeper();
if (zookeeper->exists(zk_path))
zookeeper->removeRecursive(zk_path);
auto glob_iterator = std::make_unique<StorageS3QueueSource::GlobIterator>(
*configuration.client, configuration.url, query, virtual_columns, local_context,
/* read_keys */nullptr, configuration.request_settings);
return std::make_shared<FileIterator>(files_metadata, std::move(glob_iterator), shutdown_called);
}
void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
@ -540,11 +472,15 @@ void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
[](const StorageFactory::Arguments & args)
{
if (!args.attach && !args.getLocalContext()->getSettingsRef().allow_experimental_s3queue)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3Queue is experimental. You can enable it with the `allow_experimental_s3queue` setting.");
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3Queue is experimental. "
"You can enable it with the `allow_experimental_s3queue` setting.");
}
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext());
// Use format settings from global server context + settings from
@ -582,10 +518,6 @@ void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
format_settings = getFormatSettings(args.getContext());
}
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageS3Queue>(
std::move(s3queue_settings),
std::move(configuration),
@ -594,12 +526,10 @@ void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
args.constraints,
args.comment,
args.getContext(),
format_settings,
partition_by);
format_settings);
},
{
.supports_settings = true,
.supports_sort_order = true, // for partition by
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});

View File

@ -1,32 +1,15 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
# include <Core/Types.h>
# include <Compression/CompressionInfo.h>
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Core/BackgroundSchedulePool.h>
# include <Storages/IStorage.h>
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/S3Queue/S3QueueSettings.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/StorageS3Settings.h>
# include <IO/CompressionMethod.h>
# include <IO/S3/getObjectInfo.h>
# include <Interpreters/Context.h>
# include <Interpreters/threadPoolCallbackRunner.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/ISource.h>
# include <Storages/Cache/SchemaCache.h>
# include <Storages/StorageConfiguration.h>
# include <Storages/StorageS3.h>
# include <Poco/URI.h>
# include <Common/logger_useful.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/logger_useful.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueSource.h>
#include <Storages/StorageS3.h>
#include <Interpreters/Context.h>
namespace Aws::S3
{
@ -35,7 +18,7 @@ class Client;
namespace DB
{
class S3QueueFilesMetadata;
class StorageS3Queue : public IStorage, WithContext
{
@ -50,8 +33,7 @@ public:
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
ASTPtr partition_by_ = nullptr);
std::optional<FormatSettings> format_settings_);
String getName() const override { return "S3Queue"; }
@ -64,79 +46,55 @@ public:
size_t max_block_size,
size_t num_streams) override;
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool async_insert) override;
void truncate(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*local_context*/,
TableExclusiveLockHolder &) override;
NamesAndTypesList getVirtuals() const override;
bool supportsPartitionBy() const override;
NamesAndTypesList getVirtuals() const override { return virtual_columns; }
const auto & getFormatName() const { return configuration.format; }
const String & getZooKeeperPath() const { return zk_path; }
const fs::path & getZooKeeperPath() const { return zk_path; }
zkutil::ZooKeeperPtr getZooKeeper() const;
private:
using FileIterator = StorageS3QueueSource::FileIterator;
const std::unique_ptr<S3QueueSettings> s3queue_settings;
const fs::path zk_path;
const S3QueueAction after_processing;
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
Configuration configuration;
const std::optional<FormatSettings> format_settings;
NamesAndTypesList virtual_columns;
BackgroundSchedulePool::TaskHolder task;
std::atomic<bool> stream_cancelled{false};
UInt64 reschedule_processing_interval_ms;
std::optional<FormatSettings> format_settings;
ASTPtr partition_by;
String zk_path;
mutable zkutil::ZooKeeperPtr zk_client;
mutable std::mutex zk_mutex;
std::atomic<bool> mv_attached = false;
std::atomic<bool> shutdown_called{false};
std::atomic<bool> shutdown_called = false;
Poco::Logger * log;
bool supportsSubcolumns() const override;
bool withGlobs() const { return configuration.url.key.find_first_of("*?{") != std::string::npos; }
void threadFunc();
size_t getTableDependentCount() const;
bool hasDependencies(const StorageID & table_id);
void startup() override;
void shutdown() override;
void drop() override;
struct TaskContext
{
BackgroundSchedulePool::TaskHolder holder;
std::atomic<bool> stream_cancelled{false};
explicit TaskContext(BackgroundSchedulePool::TaskHolder && task_) : holder(std::move(task_)) { }
};
std::shared_ptr<TaskContext> task;
bool supportsSubsetOfColumns(const ContextPtr & context_) const;
bool supportsSubcolumns() const override { return true; }
const UInt32 zk_create_table_retries = 1000;
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot);
std::shared_ptr<FileIterator> createFileIterator(ContextPtr local_context, ASTPtr query);
std::shared_ptr<StorageS3QueueSource> createSource(
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
size_t max_block_size,
ContextPtr local_context);
using KeysWithInfo = StorageS3QueueSource::KeysWithInfo;
bool hasDependencies(const StorageID & table_id);
bool streamToViews();
void threadFunc();
std::shared_ptr<StorageS3QueueSource::IIterator>
createFileIterator(ContextPtr local_context, ASTPtr query);
void streamToViews();
void createOrCheckMetadata(const StorageInMemoryMetadata & storage_metadata);
void checkTableStructure(const String & zookeeper_prefix, const StorageInMemoryMetadata & storage_metadata);
Configuration updateConfigurationAndGetCopy(ContextPtr local_context);
};

View File

@ -161,7 +161,7 @@ public:
/// We don't have to list bucket, because there is no asterisks.
if (key_prefix.size() == globbed_uri.key.size())
{
buffer.emplace_back(globbed_uri.key, std::nullopt);
buffer.emplace_back(std::make_shared<KeyWithInfo>(globbed_uri.key, std::nullopt));
buffer_iter = buffer.begin();
is_finished = true;
return;
@ -182,7 +182,7 @@ public:
fillInternalBufferAssumeLocked();
}
KeyWithInfo next()
KeyWithInfoPtr next()
{
std::lock_guard lock(mutex);
return nextAssumeLocked();
@ -201,7 +201,7 @@ public:
private:
using ListObjectsOutcome = Aws::S3::Model::ListObjectsV2Outcome;
KeyWithInfo nextAssumeLocked()
KeyWithInfoPtr nextAssumeLocked()
{
if (buffer_iter != buffer.end())
{
@ -210,11 +210,11 @@ private:
/// If url doesn't contain globs, we didn't list s3 bucket and didn't get object info for the key.
/// So we get object info lazily here on 'next()' request.
if (!answer.info)
if (!answer->info)
{
answer.info = S3::getObjectInfo(*client, globbed_uri.bucket, answer.key, globbed_uri.version_id, request_settings);
answer->info = S3::getObjectInfo(*client, globbed_uri.bucket, answer->key, globbed_uri.version_id, request_settings);
if (file_progress_callback)
file_progress_callback(FileProgress(0, answer.info->size));
file_progress_callback(FileProgress(0, answer->info->size));
}
return answer;
@ -287,7 +287,7 @@ private:
.last_modification_time = row.GetLastModified().Millis() / 1000,
};
temp_buffer.emplace_back(std::move(key), std::move(info));
temp_buffer.emplace_back(std::make_shared<KeyWithInfo>(std::move(key), std::move(info)));
}
}
@ -299,7 +299,7 @@ private:
if (!is_initialized)
{
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(globbed_uri.bucket) / temp_buffer.front().key, getContext());
filter_ast = VirtualColumnUtils::createPathAndFileFilterAst(query, virtual_columns, fs::path(globbed_uri.bucket) / temp_buffer.front()->key, getContext());
is_initialized = true;
}
@ -308,7 +308,7 @@ private:
std::vector<String> paths;
paths.reserve(temp_buffer.size());
for (const auto & key_with_info : temp_buffer)
paths.push_back(fs::path(globbed_uri.bucket) / key_with_info.key);
paths.push_back(fs::path(globbed_uri.bucket) / key_with_info->key);
VirtualColumnUtils::filterByPathOrFile(temp_buffer, paths, query, virtual_columns, getContext(), filter_ast);
}
@ -317,8 +317,8 @@ private:
if (file_progress_callback)
{
for (const auto & [_, info] : buffer)
file_progress_callback(FileProgress(0, info->size));
for (const auto & key_with_info : buffer)
file_progress_callback(FileProgress(0, key_with_info->info->size));
}
/// Set iterator only after the whole batch is processed
@ -381,7 +381,7 @@ StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(
{
}
StorageS3Source::KeyWithInfo StorageS3Source::DisclosedGlobIterator::next()
StorageS3Source::KeyWithInfoPtr StorageS3Source::DisclosedGlobIterator::next()
{
return pimpl->next();
}
@ -432,11 +432,11 @@ public:
if (read_keys_)
{
for (const auto & key : keys)
read_keys_->push_back({key, {}});
read_keys_->push_back(std::make_shared<KeyWithInfo>(key));
}
}
KeyWithInfo next()
KeyWithInfoPtr next()
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= keys.size())
@ -449,7 +449,7 @@ public:
file_progress_callback(FileProgress(0, info->size));
}
return {key, info};
return std::make_shared<KeyWithInfo>(key, info);
}
size_t objectsCount()
@ -486,7 +486,7 @@ StorageS3Source::KeysIterator::KeysIterator(
{
}
StorageS3Source::KeyWithInfo StorageS3Source::KeysIterator::next()
StorageS3Source::KeyWithInfoPtr StorageS3Source::KeysIterator::next()
{
return pimpl->next();
}
@ -512,14 +512,14 @@ StorageS3Source::ReadTaskIterator::ReadTaskIterator(
pool.wait();
buffer.reserve(max_threads_count);
for (auto & key_future : keys)
buffer.emplace_back(key_future.get(), std::nullopt);
buffer.emplace_back(std::make_shared<KeyWithInfo>(key_future.get(), std::nullopt));
}
StorageS3Source::KeyWithInfo StorageS3Source::ReadTaskIterator::next()
StorageS3Source::KeyWithInfoPtr StorageS3Source::ReadTaskIterator::next()
{
size_t current_index = index.fetch_add(1, std::memory_order_relaxed);
if (current_index >= buffer.size())
return {callback(), {}};
return std::make_shared<KeyWithInfo>(callback());
return buffer[current_index];
}
@ -576,22 +576,22 @@ StorageS3Source::StorageS3Source(
StorageS3Source::ReaderHolder StorageS3Source::createReader()
{
KeyWithInfo key_with_info;
KeyWithInfoPtr key_with_info;
do
{
key_with_info = (*file_iterator)();
if (key_with_info.key.empty())
if (!key_with_info || key_with_info->key.empty())
return {};
if (!key_with_info.info)
key_with_info.info = S3::getObjectInfo(*client, bucket, key_with_info.key, version_id, request_settings);
if (!key_with_info->info)
key_with_info->info = S3::getObjectInfo(*client, bucket, key_with_info->key, version_id, request_settings);
}
while (getContext()->getSettingsRef().s3_skip_empty_files && key_with_info.info->size == 0);
while (getContext()->getSettingsRef().s3_skip_empty_files && key_with_info->info->size == 0);
QueryPipelineBuilder builder;
std::shared_ptr<ISource> source;
std::unique_ptr<ReadBuffer> read_buf;
std::optional<size_t> num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(key_with_info) : std::nullopt;
std::optional<size_t> num_rows_from_cache = need_only_count && getContext()->getSettingsRef().use_cache_for_count_from_files ? tryGetNumRowsFromCache(*key_with_info) : std::nullopt;
if (num_rows_from_cache)
{
/// We should not return single chunk with all number of rows,
@ -604,8 +604,8 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
}
else
{
auto compression_method = chooseCompressionMethod(key_with_info.key, compression_hint);
read_buf = createS3ReadBuffer(key_with_info.key, key_with_info.info->size);
auto compression_method = chooseCompressionMethod(key_with_info->key, compression_hint);
read_buf = createS3ReadBuffer(key_with_info->key, key_with_info->info->size);
auto input_format = FormatFactory::instance().getInput(
format,
@ -1505,7 +1505,7 @@ namespace
{
current_key_with_info = (*file_iterator)();
if (current_key_with_info.key.empty())
if (!current_key_with_info || current_key_with_info->key.empty())
{
if (first)
throw Exception(
@ -1526,15 +1526,15 @@ namespace
return nullptr;
}
if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info.info && current_key_with_info.info->size == 0)
if (getContext()->getSettingsRef().s3_skip_empty_files && current_key_with_info->info && current_key_with_info->info->size == 0)
continue;
int zstd_window_log_max = static_cast<int>(getContext()->getSettingsRef().zstd_window_log_max);
auto impl = std::make_unique<ReadBufferFromS3>(configuration.client, configuration.url.bucket, current_key_with_info.key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings());
auto impl = std::make_unique<ReadBufferFromS3>(configuration.client, configuration.url.bucket, current_key_with_info->key, configuration.url.version_id, configuration.request_settings, getContext()->getReadSettings());
if (!getContext()->getSettingsRef().s3_skip_empty_files || !impl->eof())
{
first = false;
return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info.key, configuration.compression_method), zstd_window_log_max);
return wrapReadBufferWithCompressionMethod(std::move(impl), chooseCompressionMethod(current_key_with_info->key, configuration.compression_method), zstd_window_log_max);
}
}
}
@ -1549,7 +1549,7 @@ namespace
if (!getContext()->getSettingsRef().schema_inference_use_cache_for_s3)
return;
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info.key;
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket / current_key_with_info->key;
auto key = getKeyForSchemaCache(source, configuration.format, format_settings, getContext());
StorageS3::getSchemaCache(getContext()).addNumRows(key, num_rows);
}
@ -1560,7 +1560,7 @@ namespace
const StorageS3::Configuration & configuration;
const std::optional<FormatSettings> & format_settings;
std::optional<ColumnsDescription> columns_from_cache;
StorageS3Source::KeyWithInfo current_key_with_info;
StorageS3Source::KeyWithInfoPtr current_key_with_info;
size_t prev_read_keys_size;
bool first = true;
};
@ -1700,9 +1700,9 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
auto get_last_mod_time = [&]
{
time_t last_modification_time = 0;
if (it->info)
if ((*it)->info)
{
last_modification_time = it->info->last_modification_time;
last_modification_time = (*it)->info->last_modification_time;
}
else
{
@ -1712,7 +1712,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
last_modification_time = S3::getObjectInfo(
*configuration.client,
configuration.url.bucket,
it->key,
(*it)->key,
configuration.url.version_id,
configuration.request_settings,
/*with_metadata=*/ false,
@ -1723,7 +1723,7 @@ std::optional<ColumnsDescription> StorageS3::tryGetColumnsFromCache(
return last_modification_time ? std::make_optional(last_modification_time) : std::nullopt;
};
String path = fs::path(configuration.url.bucket) / it->key;
String path = fs::path(configuration.url.bucket) / (*it)->key;
String source = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / path;
auto cache_key = getKeyForSchemaCache(source, configuration.format, format_settings, ctx);
auto columns = schema_cache.tryGetColumns(cache_key, get_last_mod_time);
@ -1745,7 +1745,7 @@ void StorageS3::addColumnsToCache(
auto host_and_bucket = fs::path(configuration.url.uri.getHost() + std::to_string(configuration.url.uri.getPort())) / configuration.url.bucket;
Strings sources;
sources.reserve(keys.size());
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem.key; });
std::transform(keys.begin(), keys.end(), std::back_inserter(sources), [&](const auto & elem){ return host_and_bucket / elem->key; });
auto cache_keys = getKeysForSchemaCache(sources, format_name, format_settings, ctx);
auto & schema_cache = getSchemaCache(ctx);
schema_cache.addManyColumns(cache_keys, columns);

View File

@ -43,22 +43,24 @@ public:
struct KeyWithInfo
{
KeyWithInfo() = default;
KeyWithInfo(String key_, std::optional<S3::ObjectInfo> info_)
: key(std::move(key_)), info(std::move(info_))
{
}
explicit KeyWithInfo(String key_, std::optional<S3::ObjectInfo> info_ = std::nullopt)
: key(std::move(key_)), info(std::move(info_)) {}
virtual ~KeyWithInfo() = default;
String key;
std::optional<S3::ObjectInfo> info;
};
using KeyWithInfoPtr = std::shared_ptr<KeyWithInfo>;
using KeysWithInfo = std::vector<KeyWithInfo>;
using KeysWithInfo = std::vector<KeyWithInfoPtr>;
class IIterator
{
public:
virtual ~IIterator() = default;
virtual KeyWithInfo next() = 0;
virtual KeyWithInfoPtr next() = 0;
/// Estimates how many streams we need to process all files.
/// If keys count >= max_threads_count, the returned number may not represent the actual number of the keys.
@ -66,7 +68,7 @@ public:
/// fixme: May underestimate if the glob has a strong filter, so there are few matches among the first 1000 ListObjects results.
virtual size_t estimatedKeysCount() = 0;
KeyWithInfo operator ()() { return next(); }
KeyWithInfoPtr operator ()() { return next(); }
};
class DisclosedGlobIterator : public IIterator
@ -82,7 +84,7 @@ public:
const S3Settings::RequestSettings & request_settings_ = {},
std::function<void(FileProgress)> progress_callback_ = {});
KeyWithInfo next() override;
KeyWithInfoPtr next() override;
size_t estimatedKeysCount() override;
private:
@ -106,7 +108,7 @@ public:
KeysWithInfo * read_keys = nullptr,
std::function<void(FileProgress)> progress_callback_ = {});
KeyWithInfo next() override;
KeyWithInfoPtr next() override;
size_t estimatedKeysCount() override;
private:
@ -120,7 +122,7 @@ public:
public:
explicit ReadTaskIterator(const ReadTaskCallback & callback_, const size_t max_threads_count);
KeyWithInfo next() override;
KeyWithInfoPtr next() override;
size_t estimatedKeysCount() override;
private:
@ -176,13 +178,13 @@ private:
{
public:
ReaderHolder(
KeyWithInfo key_with_info_,
KeyWithInfoPtr key_with_info_,
String bucket_,
std::unique_ptr<ReadBuffer> read_buf_,
std::shared_ptr<ISource> source_,
std::unique_ptr<QueryPipeline> pipeline_,
std::unique_ptr<PullingPipelineExecutor> reader_)
: key_with_info(std::move(key_with_info_))
: key_with_info(key_with_info_)
, bucket(std::move(bucket_))
, read_buf(std::move(read_buf_))
, source(std::move(source_))
@ -216,14 +218,14 @@ private:
explicit operator bool() const { return reader != nullptr; }
PullingPipelineExecutor * operator->() { return reader.get(); }
const PullingPipelineExecutor * operator->() const { return reader.get(); }
String getPath() const { return fs::path(bucket) / key_with_info.key; }
const String & getFile() const { return key_with_info.key; }
const KeyWithInfo & getKeyWithInfo() const { return key_with_info; }
String getPath() const { return fs::path(bucket) / key_with_info->key; }
const String & getFile() const { return key_with_info->key; }
const KeyWithInfo & getKeyWithInfo() const { return *key_with_info; }
const IInputFormat * getInputFormat() const { return dynamic_cast<const IInputFormat *>(source.get()); }
private:
KeyWithInfo key_with_info;
KeyWithInfoPtr key_with_info;
String bucket;
std::unique_ptr<ReadBuffer> read_buf;
std::shared_ptr<ISource> source;

View File

@ -82,7 +82,13 @@ RemoteQueryExecutor::Extension StorageS3Cluster::getTaskIteratorExtension(ASTPtr
{
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(
*s3_configuration.client, s3_configuration.url, query, virtual_columns, context, nullptr, s3_configuration.request_settings, context->getFileProgressCallback());
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String { return iterator->next().key; });
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
{
if (auto next = iterator->next())
return next->key;
return "";
});
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
}

View File

@ -0,0 +1,73 @@
#include "StorageSystemS3Queue.h"
#include <Access/ContextAccess.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeMap.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Storages/S3Queue/S3QueueFilesMetadata.h>
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
#include <Storages/S3Queue/StorageS3Queue.h>
#include <Disks/IDisk.h>
namespace DB
{
NamesAndTypesList StorageSystemS3Queue::getNamesAndTypes()
{
return {
{"zookeeper_path", std::make_shared<DataTypeString>()},
{"file_name", std::make_shared<DataTypeString>()},
{"rows_processed", std::make_shared<DataTypeUInt64>()},
{"status", std::make_shared<DataTypeString>()},
{"processing_start_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())},
{"processing_end_time", std::make_shared<DataTypeNullable>(std::make_shared<DataTypeDateTime>())},
{"ProfileEvents", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeUInt64>())},
{"exception", std::make_shared<DataTypeString>()},
};
}
StorageSystemS3Queue::StorageSystemS3Queue(const StorageID & table_id_)
: IStorageSystemOneBlock(table_id_)
{
}
void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, const SelectQueryInfo &) const
{
for (const auto & [zookeeper_path, metadata] : S3QueueMetadataFactory::instance().getAll())
{
for (const auto & [file_name, file_status] : metadata->getFileStateses())
{
size_t i = 0;
res_columns[i++]->insert(zookeeper_path);
res_columns[i++]->insert(file_name);
std::lock_guard lock(file_status->metadata_lock);
res_columns[i++]->insert(file_status->processed_rows.load());
res_columns[i++]->insert(magic_enum::enum_name(file_status->state));
if (file_status->processing_start_time)
res_columns[i++]->insert(file_status->processing_start_time);
else
res_columns[i++]->insertDefault();
if (file_status->processing_end_time)
res_columns[i++]->insert(file_status->processing_end_time);
else
res_columns[i++]->insertDefault();
ProfileEvents::dumpToMapColumn(file_status->profile_counters.getPartiallyAtomicSnapshot(), res_columns[i++].get(), true);
res_columns[i++]->insert(file_status->last_exception);
}
}
}
}

View File

@ -0,0 +1,23 @@
#pragma once
#include "config.h"
#include <Storages/System/IStorageSystemOneBlock.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h>
namespace DB
{
class StorageSystemS3Queue final : public IStorageSystemOneBlock<StorageSystemS3Queue>
{
public:
explicit StorageSystemS3Queue(const StorageID & table_id_);
std::string getName() const override { return "SystemS3Queue"; }
static NamesAndTypesList getNamesAndTypes();
protected:
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo & query_info) const override;
};
}

View File

@ -84,6 +84,7 @@
#include <Storages/System/StorageSystemZooKeeperConnection.h>
#include <Storages/System/StorageSystemJemalloc.h>
#include <Storages/System/StorageSystemScheduler.h>
#include <Storages/System/StorageSystemS3Queue.h>
#if USE_RDKAFKA
#include <Storages/System/StorageSystemKafkaConsumers.h>
@ -196,6 +197,7 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
attach<StorageSystemNamedCollections>(context, system_database, "named_collections");
attach<StorageSystemUserProcesses>(context, system_database, "user_processes");
attach<StorageSystemJemallocBins>(context, system_database, "jemalloc_bins");
attach<StorageSystemS3Queue>(context, system_database, "s3queue");
if (has_zookeeper)
{

View File

@ -3,6 +3,7 @@
<default>
<stream_like_engine_allow_direct_select>1</stream_like_engine_allow_direct_select>
<allow_experimental_s3queue>1</allow_experimental_s3queue>
<s3queue_enable_logging_to_s3queue_log>1</s3queue_enable_logging_to_s3queue_log>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,16 @@
<clickhouse>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
</zookeeper>
</clickhouse>

File diff suppressed because it is too large Load Diff