diff --git a/docs/en/engines/table-engines/index.md b/docs/en/engines/table-engines/index.md
index bd704d0e87e..b024820024a 100644
--- a/docs/en/engines/table-engines/index.md
+++ b/docs/en/engines/table-engines/index.md
@@ -60,6 +60,7 @@ Engines in the family:
- [EmbeddedRocksDB](../../engines/table-engines/integrations/embedded-rocksdb.md)
- [RabbitMQ](../../engines/table-engines/integrations/rabbitmq.md)
- [PostgreSQL](../../engines/table-engines/integrations/postgresql.md)
+- [S3Queue](../../engines/table-engines/integrations/s3queue.md)
### Special Engines {#special-engines}
diff --git a/docs/en/engines/table-engines/integrations/s3queue.md b/docs/en/engines/table-engines/integrations/s3queue.md
new file mode 100644
index 00000000000..3e6cf88312f
--- /dev/null
+++ b/docs/en/engines/table-engines/integrations/s3queue.md
@@ -0,0 +1,224 @@
+---
+slug: /en/engines/table-engines/integrations/s3queue
+sidebar_position: 7
+sidebar_label: S3Queue
+---
+
+# S3Queue Table Engine
+This engine provides integration with [Amazon S3](https://aws.amazon.com/s3/) ecosystem and allows streaming import. This engine is similar to the [Kafka](../../../engines/table-engines/integrations/kafka.md), [RabbitMQ](../../../engines/table-engines/integrations/rabbitmq.md) engines, but provides S3-specific features.
+
+## Create Table {#creating-a-table}
+
+``` sql
+CREATE TABLE s3_queue_engine_table (name String, value UInt32)
+ ENGINE = S3Queue(path [, NOSIGN | aws_access_key_id, aws_secret_access_key,] format, [compression])
+ [SETTINGS]
+ [mode = 'unordered',]
+ [after_processing = 'keep',]
+ [keeper_path = '',]
+ [s3queue_loading_retries = 0,]
+ [s3queue_polling_min_timeout_ms = 1000,]
+ [s3queue_polling_max_timeout_ms = 10000,]
+ [s3queue_polling_backoff_ms = 0,]
+ [s3queue_tracked_files_limit = 1000,]
+ [s3queue_tracked_file_ttl_sec = 0,]
+ [s3queue_polling_size = 50,]
+```
+
+**Engine parameters**
+
+- `path` — Bucket url with path to file. Supports following wildcards in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings. For more information see [below](#wildcards-in-path).
+- `NOSIGN` - If this keyword is provided in place of credentials, all the requests will not be signed.
+- `format` — The [format](../../../interfaces/formats.md#formats) of the file.
+- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file. For more information see [Using S3 for Data Storage](../mergetree-family/mergetree.md#table_engine-mergetree-s3).
+- `compression` — Compression type. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. Parameter is optional. By default, it will autodetect compression by file extension.
+
+**Example**
+
+```sql
+CREATE TABLE s3queue_engine_table (name String, value UInt32)
+ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
+SETTINGS
+ mode = 'ordred';
+```
+
+Using named collections:
+
+``` xml
+
+
+
+ 'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*
+ test
+ test
+
+
+
+```
+
+```sql
+CREATE TABLE s3queue_engine_table (name String, value UInt32)
+ENGINE=S3Queue(s3queue_conf, format = 'CSV', compression_method = 'gzip')
+SETTINGS
+ mode = 'ordred';
+```
+
+## Settings {#s3queue-settings}
+
+### mode {#mode}
+
+Possible values:
+
+- unordered — With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKeeper.
+- ordered — With ordered mode, only the max name of the successfully consumed file, and the names of files that will be retried after unsuccessful loading attempt are being stored in ZooKeeper.
+
+Default value: `unordered`.
+
+### after_processing {#after_processing}
+
+Delete or keep file after successful processing.
+Possible values:
+
+- keep.
+- delete.
+
+Default value: `keep`.
+
+### keeper_path {#keeper_path}
+
+The path in ZooKeeper can be specified as a table engine setting or default path can be formed from the global configuration-provided path and table UUID.
+Possible values:
+
+- String.
+
+Default value: `/`.
+
+### s3queue_loading_retries {#s3queue_loading_retries}
+
+Retry file loading up to specified number of times. By default, there are no retries.
+Possible values:
+
+- Positive integer.
+
+Default value: `0`.
+
+### s3queue_polling_min_timeout_ms {#s3queue_polling_min_timeout_ms}
+
+Minimal timeout before next polling (in milliseconds).
+
+Possible values:
+
+- Positive integer.
+
+Default value: `1000`.
+
+### s3queue_polling_max_timeout_ms {#s3queue_polling_max_timeout_ms}
+
+Maximum timeout before next polling (in milliseconds).
+
+Possible values:
+
+- Positive integer.
+
+Default value: `10000`.
+
+### s3queue_polling_backoff_ms {#s3queue_polling_backoff_ms}
+
+Polling backoff (in milliseconds).
+
+Possible values:
+
+- Positive integer.
+
+Default value: `0`.
+
+### s3queue_tracked_files_limit {#s3queue_tracked_files_limit}
+
+Allows to limit the number of Zookeeper nodes if the 'unordered' mode is used, does nothing for 'ordered' mode.
+If limit reached the oldest processed files will be deleted from ZooKeeper node and processed again.
+
+Possible values:
+
+- Positive integer.
+
+Default value: `1000`.
+
+### s3queue_tracked_file_ttl_sec {#s3queue_tracked_file_ttl_sec}
+
+Maximum number of seconds to store processed files in ZooKeeper node (store forever by default) for 'unordered' mode, does nothing for 'ordered' mode.
+After the specified number of seconds, the file will be re-imported.
+
+Possible values:
+
+- Positive integer.
+
+Default value: `0`.
+
+### s3queue_polling_size {#s3queue_polling_size}
+
+Maximum files to fetch from S3 with SELECT or in background task.
+Engine takes files for processing from S3 in batches.
+We limit the batch size to increase concurrency if multiple table engines with the same `keeper_path` consume files from the same path.
+
+Possible values:
+
+- Positive integer.
+
+Default value: `50`.
+
+
+## S3-related Settings {#s3-settings}
+
+Engine supports all s3 related settings. For more information about S3 settings see [here](../../../engines/table-engines/integrations/s3.md).
+
+
+## Description {#description}
+
+`SELECT` is not particularly useful for streaming import (except for debugging), because each file can be imported only once. It is more practical to create real-time threads using [materialized views](../../../sql-reference/statements/create/view.md). To do this:
+
+1. Use the engine to create a table for consuming from specified path in S3 and consider it a data stream.
+2. Create a table with the desired structure.
+3. Create a materialized view that converts data from the engine and puts it into a previously created table.
+
+When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background.
+
+Example:
+
+``` sql
+ CREATE TABLE s3queue_engine_table (name String, value UInt32)
+ ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
+ SETTINGS
+ mode = 'unordred',
+ keeper_path = '/clickhouse/s3queue/';
+
+ CREATE TABLE stats (name String, value UInt32)
+ ENGINE = MergeTree() ORDER BY name;
+
+ CREATE MATERIALIZED VIEW consumer TO stats
+ AS SELECT name, value FROM s3queue_engine_table;
+
+ SELECT * FROM stats ORDER BY name;
+```
+
+## Virtual columns {#virtual-columns}
+
+- `_path` — Path to the file.
+- `_file` — Name of the file.
+
+For more information about virtual columns see [here](../../../engines/table-engines/index.md#table_engines-virtual_columns).
+
+
+## Wildcards In Path {#wildcards-in-path}
+
+`path` argument can specify multiple files using bash-like wildcards. For being processed file should exist and match to the whole path pattern. Listing of files is determined during `SELECT` (not at `CREATE` moment).
+
+- `*` — Substitutes any number of any characters except `/` including empty string.
+- `?` — Substitutes any single character.
+- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`.
+- `{N..M}` — Substitutes any number in range from N to M including both borders. N and M can have leading zeroes e.g. `000..078`.
+
+Constructions with `{}` are similar to the [remote](../../../sql-reference/table-functions/remote.md) table function.
+
+:::note
+If the listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`.
+:::
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5c66c7e9495..36fec922645 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -248,6 +248,7 @@ add_object_library(clickhouse_storages_distributed Storages/Distributed)
add_object_library(clickhouse_storages_mergetree Storages/MergeTree)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_storages_windowview Storages/WindowView)
+add_object_library(clickhouse_storages_s3queue Storages/S3Queue)
add_object_library(clickhouse_client Client)
add_object_library(clickhouse_bridge BridgeHelper)
add_object_library(clickhouse_server Server)
diff --git a/src/Core/Settings.h b/src/Core/Settings.h
index 4e0ce7e4773..13fde626f16 100644
--- a/src/Core/Settings.h
+++ b/src/Core/Settings.h
@@ -104,6 +104,7 @@ class IColumn;
M(UInt64, s3_retry_attempts, 10, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
+ M(String, s3queue_default_zookeeper_path, "/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \
diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp
index 86400954e2f..f3e7b692085 100644
--- a/src/Core/SettingsEnums.cpp
+++ b/src/Core/SettingsEnums.cpp
@@ -175,4 +175,11 @@ IMPLEMENT_SETTING_ENUM(ORCCompression, ErrorCodes::BAD_ARGUMENTS,
{"zlib", FormatSettings::ORCCompression::ZLIB},
{"lz4", FormatSettings::ORCCompression::LZ4}})
+IMPLEMENT_SETTING_ENUM(S3QueueMode, ErrorCodes::BAD_ARGUMENTS,
+ {{"ordered", S3QueueMode::ORDERED},
+ {"unordered", S3QueueMode::UNORDERED}})
+
+IMPLEMENT_SETTING_ENUM(S3QueueAction, ErrorCodes::BAD_ARGUMENTS,
+ {{"keep", S3QueueAction::KEEP},
+ {"delete", S3QueueAction::DELETE}})
}
diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h
index c61afbd2bbf..da0163f8b6e 100644
--- a/src/Core/SettingsEnums.h
+++ b/src/Core/SettingsEnums.h
@@ -221,4 +221,21 @@ enum class ParallelReplicasCustomKeyFilterType : uint8_t
DECLARE_SETTING_ENUM(ParallelReplicasCustomKeyFilterType)
DECLARE_SETTING_ENUM(LocalFSReadMethod)
+
+enum class S3QueueMode
+{
+ ORDERED,
+ UNORDERED,
+};
+
+DECLARE_SETTING_ENUM(S3QueueMode)
+
+enum class S3QueueAction
+{
+ KEEP,
+ DELETE,
+};
+
+DECLARE_SETTING_ENUM(S3QueueAction)
+
}
diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.cpp b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp
new file mode 100644
index 00000000000..514baeb7b07
--- /dev/null
+++ b/src/Storages/S3Queue/S3QueueFilesMetadata.cpp
@@ -0,0 +1,351 @@
+#include "IO/VarInt.h"
+#include "config.h"
+
+#if USE_AWS_S3
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int TIMEOUT_EXCEEDED;
+}
+
+namespace
+{
+ UInt64 getCurrentTime()
+ {
+ return std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count();
+ }
+}
+
+void S3QueueFilesMetadata::S3QueueCollection::read(ReadBuffer & in)
+{
+ files = {};
+ if (in.eof())
+ return;
+
+ size_t files_num;
+ in >> files_num >> "\n";
+ while (files_num--)
+ {
+ TrackedCollectionItem item;
+ in >> item.file_path >> "\n";
+ in >> item.timestamp >> "\n";
+ in >> item.retries_count >> "\n";
+ in >> item.last_exception >> "\n";
+ files.push_back(item);
+ }
+}
+
+void S3QueueFilesMetadata::S3QueueCollection::write(WriteBuffer & out) const
+{
+ out << files.size() << "\n";
+ for (const auto & processed_file : files)
+ {
+ out << processed_file.file_path << "\n";
+ out << processed_file.timestamp << "\n";
+ out << processed_file.retries_count << "\n";
+ out << processed_file.last_exception << "\n";
+ }
+}
+
+String S3QueueFilesMetadata::S3QueueCollection::toString() const
+{
+ WriteBufferFromOwnString out;
+ write(out);
+ return out.str();
+}
+
+S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueCollection::getFileNames()
+{
+ S3FilesCollection keys = {};
+ for (const auto & pair : files)
+ keys.insert(pair.file_path);
+ return keys;
+}
+
+
+S3QueueFilesMetadata::S3QueueProcessedCollection::S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_)
+ : max_size(max_size_), max_age(max_age_)
+{
+}
+
+void S3QueueFilesMetadata::S3QueueProcessedCollection::parse(const String & collection_str)
+{
+ ReadBufferFromString buf(collection_str);
+ read(buf);
+ if (max_age > 0) // Remove old items
+ {
+ std::erase_if(
+ files,
+ [timestamp = getCurrentTime(), this](const TrackedCollectionItem & processed_file)
+ { return (timestamp - processed_file.timestamp) > max_age; });
+ }
+}
+
+
+void S3QueueFilesMetadata::S3QueueProcessedCollection::add(const String & file_name)
+{
+ TrackedCollectionItem processed_file = { .file_path=file_name, .timestamp = getCurrentTime() };
+ files.push_back(processed_file);
+
+ /// TODO: it is strange that in parse() we take into account only max_age, but here only max_size.
+ while (files.size() > max_size)
+ {
+ files.pop_front();
+ }
+}
+
+
+S3QueueFilesMetadata::S3QueueFailedCollection::S3QueueFailedCollection(const UInt64 & max_retries_count_)
+ : max_retries_count(max_retries_count_)
+{
+}
+
+void S3QueueFilesMetadata::S3QueueFailedCollection::parse(const String & collection_str)
+{
+ ReadBufferFromString buf(collection_str);
+ read(buf);
+}
+
+
+bool S3QueueFilesMetadata::S3QueueFailedCollection::add(const String & file_name, const String & exception_message)
+{
+ auto failed_it = std::find_if(
+ files.begin(), files.end(),
+ [&file_name](const TrackedCollectionItem & s) { return s.file_path == file_name; });
+
+ if (failed_it == files.end())
+ {
+ files.emplace_back(file_name, 0, max_retries_count, exception_message);
+ }
+ else if (failed_it->retries_count == 0 || --failed_it->retries_count == 0)
+ {
+ return false;
+ }
+ return true;
+}
+
+S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueFailedCollection::getFileNames()
+{
+ S3FilesCollection failed_keys;
+ for (const auto & pair : files)
+ {
+ if (pair.retries_count == 0)
+ failed_keys.insert(pair.file_path);
+ }
+ return failed_keys;
+}
+
+void S3QueueFilesMetadata::S3QueueProcessingCollection::parse(const String & collection_str)
+{
+ ReadBufferFromString rb(collection_str);
+ Strings result;
+ readQuoted(result, rb);
+ files = S3FilesCollection(result.begin(), result.end());
+}
+
+void S3QueueFilesMetadata::S3QueueProcessingCollection::add(const Strings & file_names)
+{
+ files.insert(file_names.begin(), file_names.end());
+}
+
+void S3QueueFilesMetadata::S3QueueProcessingCollection::remove(const String & file_name)
+{
+ files.erase(file_name);
+}
+
+String S3QueueFilesMetadata::S3QueueProcessingCollection::toString() const
+{
+ return DB::toString(Strings(files.begin(), files.end()));
+}
+
+
+S3QueueFilesMetadata::S3QueueFilesMetadata(
+ const StorageS3Queue * storage_,
+ const S3QueueSettings & settings_)
+ : storage(storage_)
+ , mode(settings_.mode)
+ , max_set_size(settings_.s3queue_tracked_files_limit.value)
+ , max_set_age_sec(settings_.s3queue_tracked_file_ttl_sec.value)
+ , max_loading_retries(settings_.s3queue_loading_retries.value)
+ , zookeeper_processing_path(fs::path(storage->getZooKeeperPath()) / "processing")
+ , zookeeper_processed_path(fs::path(storage->getZooKeeperPath()) / "processed")
+ , zookeeper_failed_path(fs::path(storage->getZooKeeperPath()) / "failed")
+ , zookeeper_lock_path(fs::path(storage->getZooKeeperPath()) / "lock")
+ , log(&Poco::Logger::get("S3QueueFilesMetadata"))
+{
+}
+
+void S3QueueFilesMetadata::setFileProcessed(const String & file_path)
+{
+ auto zookeeper = storage->getZooKeeper();
+ auto lock = acquireLock(zookeeper);
+
+ switch (mode)
+ {
+ case S3QueueMode::UNORDERED:
+ {
+ S3QueueProcessedCollection processed_files(max_set_size, max_set_age_sec);
+ processed_files.parse(zookeeper->get(zookeeper_processed_path));
+ processed_files.add(file_path);
+ zookeeper->set(zookeeper_processed_path, processed_files.toString());
+ break;
+ }
+ case S3QueueMode::ORDERED:
+ {
+ // Check that we set in ZooKeeper node only maximum processed file path.
+ // This check can be useful, when multiple table engines consume in ordered mode.
+ String max_file = getMaxProcessedFile();
+ if (max_file.compare(file_path) <= 0)
+ zookeeper->set(zookeeper_processed_path, file_path);
+ break;
+ }
+ }
+ removeProcessingFile(file_path);
+}
+
+
+bool S3QueueFilesMetadata::setFileFailed(const String & file_path, const String & exception_message)
+{
+ auto zookeeper = storage->getZooKeeper();
+ auto lock = acquireLock(zookeeper);
+
+ S3QueueFailedCollection failed_collection(max_loading_retries);
+ failed_collection.parse(zookeeper->get(zookeeper_failed_path));
+ const bool can_be_retried = failed_collection.add(file_path, exception_message);
+ zookeeper->set(zookeeper_failed_path, failed_collection.toString());
+ removeProcessingFile(file_path);
+ return can_be_retried;
+}
+
+S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getFailedFiles()
+{
+ auto zookeeper = storage->getZooKeeper();
+ String failed_files = zookeeper->get(zookeeper_failed_path);
+
+ S3QueueFailedCollection failed_collection(max_loading_retries);
+ failed_collection.parse(failed_files);
+ return failed_collection.getFileNames();
+}
+
+String S3QueueFilesMetadata::getMaxProcessedFile()
+{
+ auto zookeeper = storage->getZooKeeper();
+ return zookeeper->get(zookeeper_processed_path);
+}
+
+S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessingFiles()
+{
+ auto zookeeper = storage->getZooKeeper();
+ String processing_files;
+ if (!zookeeper->tryGet(zookeeper_processing_path, processing_files))
+ return {};
+
+ S3QueueProcessingCollection processing_collection;
+ if (!processing_files.empty())
+ processing_collection.parse(processing_files);
+ return processing_collection.getFileNames();
+}
+
+void S3QueueFilesMetadata::setFilesProcessing(const Strings & file_paths)
+{
+ auto zookeeper = storage->getZooKeeper();
+ String processing_files;
+ zookeeper->tryGet(zookeeper_processing_path, processing_files);
+
+ S3QueueProcessingCollection processing_collection;
+ if (!processing_files.empty())
+ processing_collection.parse(processing_files);
+ processing_collection.add(file_paths);
+
+ if (zookeeper->exists(zookeeper_processing_path))
+ zookeeper->set(zookeeper_processing_path, processing_collection.toString());
+ else
+ zookeeper->create(zookeeper_processing_path, processing_collection.toString(), zkutil::CreateMode::Ephemeral);
+}
+
+void S3QueueFilesMetadata::removeProcessingFile(const String & file_path)
+{
+ auto zookeeper = storage->getZooKeeper();
+ String processing_files;
+ zookeeper->tryGet(zookeeper_processing_path, processing_files);
+
+ S3QueueProcessingCollection processing_collection;
+ processing_collection.parse(processing_files);
+ processing_collection.remove(file_path);
+ zookeeper->set(zookeeper_processing_path, processing_collection.toString());
+}
+
+S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getUnorderedProcessedFiles()
+{
+ auto zookeeper = storage->getZooKeeper();
+ S3QueueProcessedCollection processed_collection(max_set_size, max_set_age_sec);
+ processed_collection.parse(zookeeper->get(zookeeper_processed_path));
+ return processed_collection.getFileNames();
+}
+
+S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessedFailedAndProcessingFiles()
+{
+ S3FilesCollection processed_and_failed_files = getFailedFiles();
+ switch (mode)
+ {
+ case S3QueueMode::UNORDERED:
+ {
+ processed_and_failed_files.merge(getUnorderedProcessedFiles());
+ break;
+ }
+ case S3QueueMode::ORDERED:
+ {
+ processed_and_failed_files.insert(getMaxProcessedFile());
+ break;
+ }
+ }
+ processed_and_failed_files.merge(getProcessingFiles());
+ return processed_and_failed_files;
+}
+
+std::shared_ptr S3QueueFilesMetadata::acquireLock(zkutil::ZooKeeperPtr zookeeper)
+{
+ UInt32 retry_count = 200;
+ UInt32 sleep_ms = 100;
+ UInt32 retries = 0;
+
+ while (true)
+ {
+ Coordination::Error code = zookeeper->tryCreate(zookeeper_lock_path, "", zkutil::CreateMode::Ephemeral);
+ if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
+ {
+ retries++;
+ if (retries > retry_count)
+ {
+ throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Can't acquire zookeeper lock");
+ }
+ sleepForMilliseconds(sleep_ms);
+ }
+ else if (code != Coordination::Error::ZOK)
+ {
+ throw Coordination::Exception(code, zookeeper_lock_path);
+ }
+ else
+ {
+ return zkutil::EphemeralNodeHolder::existing(zookeeper_lock_path, *zookeeper);
+ }
+ }
+}
+
+}
+
+#endif
diff --git a/src/Storages/S3Queue/S3QueueFilesMetadata.h b/src/Storages/S3Queue/S3QueueFilesMetadata.h
new file mode 100644
index 00000000000..c436de946ff
--- /dev/null
+++ b/src/Storages/S3Queue/S3QueueFilesMetadata.h
@@ -0,0 +1,124 @@
+#pragma once
+
+#if USE_AWS_S3
+
+# include
+# include
+# include
+# include
+
+namespace DB
+{
+class StorageS3Queue;
+struct S3QueueSettings;
+
+class S3QueueFilesMetadata
+{
+public:
+ struct TrackedCollectionItem
+ {
+ String file_path;
+ UInt64 timestamp = 0;
+ UInt64 retries_count = 0;
+ String last_exception;
+ };
+
+ using S3FilesCollection = std::unordered_set;
+ using TrackedFiles = std::deque;
+
+ S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_);
+
+ void setFilesProcessing(const Strings & file_paths);
+ void setFileProcessed(const String & file_path);
+ bool setFileFailed(const String & file_path, const String & exception_message);
+
+ S3FilesCollection getProcessedFailedAndProcessingFiles();
+ String getMaxProcessedFile();
+ std::shared_ptr acquireLock(zkutil::ZooKeeperPtr zookeeper);
+
+ struct S3QueueCollection
+ {
+ public:
+ virtual ~S3QueueCollection() = default;
+ virtual String toString() const;
+ S3FilesCollection getFileNames();
+
+ virtual void parse(const String & collection_str) = 0;
+
+ protected:
+ TrackedFiles files;
+
+ void read(ReadBuffer & in);
+ void write(WriteBuffer & out) const;
+ };
+
+ struct S3QueueProcessedCollection : public S3QueueCollection
+ {
+ public:
+ S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_);
+
+ void parse(const String & collection_str) override;
+ void add(const String & file_name);
+
+ private:
+ const UInt64 max_size;
+ const UInt64 max_age;
+ };
+
+ struct S3QueueFailedCollection : S3QueueCollection
+ {
+ public:
+ S3QueueFailedCollection(const UInt64 & max_retries_count_);
+
+ void parse(const String & collection_str) override;
+ bool add(const String & file_name, const String & exception_message);
+
+ S3FilesCollection getFileNames();
+
+ private:
+ UInt64 max_retries_count;
+ };
+
+ struct S3QueueProcessingCollection
+ {
+ public:
+ S3QueueProcessingCollection() = default;
+
+ void parse(const String & collection_str);
+ void add(const Strings & file_names);
+ void remove(const String & file_name);
+
+ String toString() const;
+ const S3FilesCollection & getFileNames() const { return files; }
+
+ private:
+ S3FilesCollection files;
+ };
+
+private:
+ const StorageS3Queue * storage;
+ const S3QueueMode mode;
+ const UInt64 max_set_size;
+ const UInt64 max_set_age_sec;
+ const UInt64 max_loading_retries;
+
+ const String zookeeper_processing_path;
+ const String zookeeper_processed_path;
+ const String zookeeper_failed_path;
+ const String zookeeper_lock_path;
+
+ mutable std::mutex mutex;
+ Poco::Logger * log;
+
+ S3FilesCollection getFailedFiles();
+ S3FilesCollection getProcessingFiles();
+ S3FilesCollection getUnorderedProcessedFiles();
+
+ void removeProcessingFile(const String & file_path);
+};
+
+
+}
+
+
+#endif
diff --git a/src/Storages/S3Queue/S3QueueSettings.cpp b/src/Storages/S3Queue/S3QueueSettings.cpp
new file mode 100644
index 00000000000..b74cf8d39bb
--- /dev/null
+++ b/src/Storages/S3Queue/S3QueueSettings.cpp
@@ -0,0 +1,41 @@
+#include
+#include
+#include
+#include
+#include
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int UNKNOWN_SETTING;
+}
+
+IMPLEMENT_SETTINGS_TRAITS(S3QueueSettingsTraits, LIST_OF_S3QUEUE_SETTINGS)
+
+void S3QueueSettings::loadFromQuery(ASTStorage & storage_def)
+{
+ if (storage_def.settings)
+ {
+ try
+ {
+ applyChanges(storage_def.settings->changes);
+ }
+ catch (Exception & e)
+ {
+ if (e.code() == ErrorCodes::UNKNOWN_SETTING)
+ e.addMessage("for storage " + storage_def.engine->name);
+ throw;
+ }
+ }
+ else
+ {
+ auto settings_ast = std::make_shared();
+ settings_ast->is_standalone = false;
+ storage_def.set(storage_def.settings, settings_ast);
+ }
+}
+
+}
diff --git a/src/Storages/S3Queue/S3QueueSettings.h b/src/Storages/S3Queue/S3QueueSettings.h
new file mode 100644
index 00000000000..75defc4a57f
--- /dev/null
+++ b/src/Storages/S3Queue/S3QueueSettings.h
@@ -0,0 +1,46 @@
+#pragma once
+
+#include
+#include
+#include
+
+
+namespace DB
+{
+class ASTStorage;
+
+
+#define S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
+ M(S3QueueMode, \
+ mode, \
+ S3QueueMode::ORDERED, \
+ "With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer." \
+ "With ordered mode, only the max name of the successfully consumed file stored.", \
+ 0) \
+ M(S3QueueAction, after_processing, S3QueueAction::KEEP, "Delete or keep file in S3 after successful processing", 0) \
+ M(String, keeper_path, "", "Zookeeper node path", 0) \
+ M(UInt64, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \
+ M(UInt64, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
+ M(UInt64, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
+ M(UInt64, s3queue_polling_backoff_ms, 0, "Polling backoff", 0) \
+ M(UInt64, s3queue_tracked_files_limit, 1000, "Max set size for tracking processed files in unordered mode in ZooKeeper", 0) \
+ M(UInt64, \
+ s3queue_tracked_file_ttl_sec, \
+ 0, \
+ "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", \
+ 0) \
+ M(UInt64, s3queue_polling_size, 50, "Maximum files to fetch from S3 with SELECT", 0)
+
+#define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \
+ S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
+ FORMAT_FACTORY_SETTINGS(M, ALIAS)
+
+DECLARE_SETTINGS_TRAITS(S3QueueSettingsTraits, LIST_OF_S3QUEUE_SETTINGS)
+
+
+struct S3QueueSettings : public BaseSettings
+{
+ void loadFromQuery(ASTStorage & storage_def);
+};
+
+}
diff --git a/src/Storages/S3Queue/S3QueueSource.cpp b/src/Storages/S3Queue/S3QueueSource.cpp
new file mode 100644
index 00000000000..57d2d6304b0
--- /dev/null
+++ b/src/Storages/S3Queue/S3QueueSource.cpp
@@ -0,0 +1,321 @@
+#include
+#include
+#include
+#include "IO/ParallelReadBuffer.h"
+#include "Parsers/ASTCreateQuery.h"
+#include "config.h"
+
+#if USE_AWS_S3
+
+# include
+
+# include
+
+# include
+# include
+
+# include
+
+# include
+# include
+
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+
+# include
+
+# include
+# include
+# include
+
+# include
+
+# include
+
+# include
+# include
+# include
+
+# include
+# include
+
+
+namespace CurrentMetrics
+{
+extern const Metric StorageS3Threads;
+extern const Metric StorageS3ThreadsActive;
+}
+
+namespace ProfileEvents
+{
+extern const Event S3DeleteObjects;
+extern const Event S3ListObjects;
+}
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int S3_ERROR;
+}
+
+
+StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator(
+ const S3::Client & client_,
+ const S3::URI & globbed_uri_,
+ ASTPtr query,
+ const Block & virtual_header,
+ ContextPtr context,
+ UInt64 & max_poll_size_,
+ const S3Settings::RequestSettings & request_settings_)
+ : max_poll_size(max_poll_size_)
+ , glob_iterator(std::make_unique(
+ client_, globbed_uri_, query, virtual_header, context, nullptr, request_settings_))
+{
+ /// todo(kssenii): remove this loop, it should not be here
+ while (true)
+ {
+ KeyWithInfo val = glob_iterator->next();
+ if (val.key.empty())
+ break;
+ keys_buf.push_back(val);
+ }
+}
+
+Strings StorageS3QueueSource::QueueGlobIterator::filterProcessingFiles(
+ const S3QueueMode & engine_mode, std::unordered_set & exclude_keys, const String & max_file)
+{
+ for (const KeyWithInfo & val : keys_buf)
+ {
+ auto full_path = val.key;
+ if (exclude_keys.find(full_path) != exclude_keys.end())
+ {
+ LOG_TEST(log, "File {} will be skipped, because it was found in exclude files list "
+ "(either already processed or failed to be processed)", val.key);
+ continue;
+ }
+
+ if ((engine_mode == S3QueueMode::ORDERED) && (full_path.compare(max_file) <= 0))
+ continue;
+
+ if ((processing_keys.size() < max_poll_size) || (engine_mode == S3QueueMode::ORDERED))
+ {
+ processing_keys.push_back(val);
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ if (engine_mode == S3QueueMode::ORDERED)
+ {
+ std::sort(
+ processing_keys.begin(),
+ processing_keys.end(),
+ [](const KeyWithInfo & lhs, const KeyWithInfo & rhs) { return lhs.key.compare(rhs.key) < 0; });
+
+ if (processing_keys.size() > max_poll_size)
+ {
+ processing_keys.erase(processing_keys.begin() + max_poll_size, processing_keys.end());
+ }
+ }
+
+ Strings keys;
+ for (const auto & key_info : processing_keys)
+ keys.push_back(key_info.key);
+
+ processing_keys.push_back(KeyWithInfo());
+ processing_iterator = processing_keys.begin();
+ return keys;
+}
+
+
+StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::QueueGlobIterator::next()
+{
+ std::lock_guard lock(mutex);
+ if (processing_iterator != processing_keys.end())
+ {
+ return *processing_iterator++;
+ }
+
+ return KeyWithInfo();
+}
+
+Block StorageS3QueueSource::getHeader(Block sample_block, const std::vector & requested_virtual_columns)
+{
+ for (const auto & virtual_column : requested_virtual_columns)
+ sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
+
+ return sample_block;
+}
+
+StorageS3QueueSource::StorageS3QueueSource(
+ const std::vector & requested_virtual_columns_,
+ const String & format_,
+ String name_,
+ const Block & sample_block_,
+ ContextPtr context_,
+ std::optional format_settings_,
+ const ColumnsDescription & columns_,
+ UInt64 max_block_size_,
+ const S3Settings::RequestSettings & request_settings_,
+ String compression_hint_,
+ const std::shared_ptr & client_,
+ const String & bucket_,
+ const String & version_id_,
+ std::shared_ptr file_iterator_,
+ std::shared_ptr files_metadata_,
+ const S3QueueAction & action_,
+ const size_t download_thread_num_)
+ : ISource(getHeader(sample_block_, requested_virtual_columns_))
+ , WithContext(context_)
+ , name(std::move(name_))
+ , bucket(bucket_)
+ , version_id(version_id_)
+ , format(format_)
+ , columns_desc(columns_)
+ , request_settings(request_settings_)
+ , client(client_)
+ , files_metadata(files_metadata_)
+ , requested_virtual_columns(requested_virtual_columns_)
+ , file_iterator(file_iterator_)
+ , action(action_)
+{
+ internal_source = std::make_shared(
+ requested_virtual_columns_,
+ format_,
+ name_,
+ sample_block_,
+ context_,
+ format_settings_,
+ columns_,
+ max_block_size_,
+ request_settings_,
+ compression_hint_,
+ client_,
+ bucket_,
+ version_id_,
+ file_iterator,
+ download_thread_num_);
+ reader = std::move(internal_source->reader);
+ if (reader)
+ reader_future = std::move(internal_source->reader_future);
+}
+
+StorageS3QueueSource::~StorageS3QueueSource()
+{
+ internal_source->create_reader_pool.wait();
+}
+
+String StorageS3QueueSource::getName() const
+{
+ return name;
+}
+
+Chunk StorageS3QueueSource::generate()
+{
+ auto file_progress = getContext()->getFileProgressCallback();
+ while (true)
+ {
+ if (isCancelled() || !reader)
+ {
+ if (reader)
+ reader->cancel();
+ break;
+ }
+
+ Chunk chunk;
+ bool success_in_pulling = false;
+ try
+ {
+ if (reader->pull(chunk))
+ {
+ UInt64 num_rows = chunk.getNumRows();
+ auto file_path = reader.getPath();
+
+ for (const auto & virtual_column : requested_virtual_columns)
+ {
+ if (virtual_column.name == "_path")
+ {
+ chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
+ }
+ else if (virtual_column.name == "_file")
+ {
+ size_t last_slash_pos = file_path.find_last_of('/');
+ auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1));
+ chunk.addColumn(column->convertToFullColumnIfConst());
+ }
+ }
+ success_in_pulling = true;
+ }
+ }
+ catch (const Exception & e)
+ {
+ LOG_ERROR(log, "Exception in chunk pulling: {} ", e.displayText());
+ files_metadata->setFileFailed(reader.getFile(), e.message());
+ success_in_pulling = false;
+ }
+ if (success_in_pulling)
+ {
+ applyActionAfterProcessing(reader.getFile());
+ files_metadata->setFileProcessed(reader.getFile());
+ return chunk;
+ }
+
+
+ assert(reader_future.valid());
+ reader = reader_future.get();
+
+ if (!reader)
+ break;
+
+ /// Even if task is finished the thread may be not freed in pool.
+ /// So wait until it will be freed before scheduling a new task.
+ internal_source->create_reader_pool.wait();
+ reader_future = internal_source->createReaderAsync();
+ }
+
+ return {};
+}
+
+
+void StorageS3QueueSource::applyActionAfterProcessing(const String & file_path)
+{
+ switch (action)
+ {
+ case S3QueueAction::DELETE:
+ deleteProcessedObject(file_path);
+ break;
+ case S3QueueAction::KEEP:
+ break;
+ }
+}
+
+void StorageS3QueueSource::deleteProcessedObject(const String & file_path)
+{
+ LOG_INFO(log, "Delete processed file {} from bucket {}", file_path, bucket);
+
+ S3::DeleteObjectRequest request;
+ request.WithKey(file_path).WithBucket(bucket);
+ auto outcome = client->DeleteObject(request);
+ if (!outcome.IsSuccess())
+ {
+ const auto & err = outcome.GetError();
+ LOG_ERROR(log, "{} (Code: {})", err.GetMessage(), static_cast(err.GetErrorType()));
+ }
+ else
+ {
+ LOG_TRACE(log, "Object with path {} was removed from S3", file_path);
+ }
+}
+
+}
+
+#endif
diff --git a/src/Storages/S3Queue/S3QueueSource.h b/src/Storages/S3Queue/S3QueueSource.h
new file mode 100644
index 00000000000..a85fce46ad8
--- /dev/null
+++ b/src/Storages/S3Queue/S3QueueSource.h
@@ -0,0 +1,124 @@
+#pragma once
+#include "config.h"
+
+#if USE_AWS_S3
+
+# include
+
+# include
+
+# include
+# include
+# include
+# include
+
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+
+
+namespace DB
+{
+
+
+class StorageS3QueueSource : public ISource, WithContext
+{
+public:
+ using IIterator = StorageS3Source::IIterator;
+ using DisclosedGlobIterator = StorageS3Source::DisclosedGlobIterator;
+ using KeysWithInfo = StorageS3Source::KeysWithInfo;
+ using KeyWithInfo = StorageS3Source::KeyWithInfo;
+ class QueueGlobIterator : public IIterator
+ {
+ public:
+ QueueGlobIterator(
+ const S3::Client & client_,
+ const S3::URI & globbed_uri_,
+ ASTPtr query,
+ const Block & virtual_header,
+ ContextPtr context,
+ UInt64 & max_poll_size_,
+ const S3Settings::RequestSettings & request_settings_ = {});
+
+ KeyWithInfo next() override;
+
+ Strings
+ filterProcessingFiles(const S3QueueMode & engine_mode, std::unordered_set & exclude_keys, const String & max_file = "");
+
+ private:
+ UInt64 max_poll_size;
+ KeysWithInfo keys_buf;
+ KeysWithInfo processing_keys;
+ mutable std::mutex mutex;
+ std::unique_ptr glob_iterator;
+ std::vector::iterator processing_iterator;
+
+ Poco::Logger * log = &Poco::Logger::get("StorageS3QueueSourceIterator");
+ };
+
+ static Block getHeader(Block sample_block, const std::vector & requested_virtual_columns);
+
+ StorageS3QueueSource(
+ const std::vector & requested_virtual_columns_,
+ const String & format,
+ String name_,
+ const Block & sample_block,
+ ContextPtr context_,
+ std::optional format_settings_,
+ const ColumnsDescription & columns_,
+ UInt64 max_block_size_,
+ const S3Settings::RequestSettings & request_settings_,
+ String compression_hint_,
+ const std::shared_ptr & client_,
+ const String & bucket,
+ const String & version_id,
+ std::shared_ptr file_iterator_,
+ std::shared_ptr files_metadata_,
+ const S3QueueAction & action_,
+ size_t download_thread_num);
+
+ ~StorageS3QueueSource() override;
+
+ String getName() const override;
+
+ Chunk generate() override;
+
+
+private:
+ String name;
+ String bucket;
+ String version_id;
+ String format;
+ ColumnsDescription columns_desc;
+ S3Settings::RequestSettings request_settings;
+ std::shared_ptr client;
+
+ std::shared_ptr files_metadata;
+ using ReaderHolder = StorageS3Source::ReaderHolder;
+ ReaderHolder reader;
+
+ std::vector requested_virtual_columns;
+ std::shared_ptr file_iterator;
+ const S3QueueAction action;
+
+ Poco::Logger * log = &Poco::Logger::get("StorageS3QueueSource");
+
+ std::future reader_future;
+
+ mutable std::mutex mutex;
+
+ std::shared_ptr internal_source;
+ void deleteProcessedObject(const String & file_path);
+ void applyActionAfterProcessing(const String & file_path);
+};
+
+}
+#endif
diff --git a/src/Storages/S3Queue/S3QueueTableMetadata.cpp b/src/Storages/S3Queue/S3QueueTableMetadata.cpp
new file mode 100644
index 00000000000..23eebb6ded9
--- /dev/null
+++ b/src/Storages/S3Queue/S3QueueTableMetadata.cpp
@@ -0,0 +1,115 @@
+#include
+
+#if USE_AWS_S3
+
+# include
+# include
+# include
+# include
+# include
+# include
+
+
+namespace DB
+{
+
+namespace ErrorCodes
+{
+ extern const int METADATA_MISMATCH;
+}
+
+S3QueueTableMetadata::S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings)
+{
+ format_name = configuration.format;
+ after_processing = engine_settings.after_processing.toString();
+ mode = engine_settings.mode.toString();
+ s3queue_tracked_files_limit = engine_settings.s3queue_tracked_files_limit;
+ s3queue_tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec;
+}
+
+
+String S3QueueTableMetadata::toString() const
+{
+ Poco::JSON::Object json;
+ json.set("after_processing", after_processing);
+ json.set("mode", mode);
+ json.set("s3queue_tracked_files_limit", s3queue_tracked_files_limit);
+ json.set("s3queue_tracked_file_ttl_sec", s3queue_tracked_file_ttl_sec);
+ json.set("format_name", format_name);
+
+ std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
+ oss.exceptions(std::ios::failbit);
+ Poco::JSON::Stringifier::stringify(json, oss);
+ return oss.str();
+}
+
+void S3QueueTableMetadata::read(const String & metadata_str)
+{
+ Poco::JSON::Parser parser;
+ auto json = parser.parse(metadata_str).extract();
+ after_processing = json->getValue("after_processing");
+ mode = json->getValue("mode");
+ s3queue_tracked_files_limit = json->getValue("s3queue_tracked_files_limit");
+ s3queue_tracked_file_ttl_sec = json->getValue("s3queue_tracked_file_ttl_sec");
+ format_name = json->getValue("format_name");
+}
+
+S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str)
+{
+ S3QueueTableMetadata metadata;
+ metadata.read(metadata_str);
+ return metadata;
+}
+
+
+void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata & from_zk) const
+{
+ if (after_processing != from_zk.after_processing)
+ throw Exception(
+ ErrorCodes::METADATA_MISMATCH,
+ "Existing table metadata in ZooKeeper differs "
+ "in action after processing. Stored in ZooKeeper: {}, local: {}",
+ DB::toString(from_zk.after_processing),
+ DB::toString(after_processing));
+
+ if (mode != from_zk.mode)
+ throw Exception(
+ ErrorCodes::METADATA_MISMATCH,
+ "Existing table metadata in ZooKeeper differs in engine mode. "
+ "Stored in ZooKeeper: {}, local: {}",
+ DB::toString(from_zk.after_processing),
+ DB::toString(after_processing));
+
+ if (s3queue_tracked_files_limit != from_zk.s3queue_tracked_files_limit)
+ throw Exception(
+ ErrorCodes::METADATA_MISMATCH,
+ "Existing table metadata in ZooKeeper differs in max set size. "
+ "Stored in ZooKeeper: {}, local: {}",
+ from_zk.s3queue_tracked_files_limit,
+ s3queue_tracked_files_limit);
+
+ if (s3queue_tracked_file_ttl_sec != from_zk.s3queue_tracked_file_ttl_sec)
+ throw Exception(
+ ErrorCodes::METADATA_MISMATCH,
+ "Existing table metadata in ZooKeeper differs in max set age. "
+ "Stored in ZooKeeper: {}, local: {}",
+ from_zk.s3queue_tracked_file_ttl_sec,
+ s3queue_tracked_file_ttl_sec);
+
+ if (format_name != from_zk.format_name)
+ throw Exception(
+ ErrorCodes::METADATA_MISMATCH,
+ "Existing table metadata in ZooKeeper differs in format name. "
+ "Stored in ZooKeeper: {}, local: {}",
+ from_zk.format_name,
+ format_name);
+}
+
+void S3QueueTableMetadata::checkEquals(const S3QueueTableMetadata & from_zk) const
+{
+ checkImmutableFieldsEquals(from_zk);
+}
+
+}
+
+#endif
diff --git a/src/Storages/S3Queue/S3QueueTableMetadata.h b/src/Storages/S3Queue/S3QueueTableMetadata.h
new file mode 100644
index 00000000000..4b6fbc54825
--- /dev/null
+++ b/src/Storages/S3Queue/S3QueueTableMetadata.h
@@ -0,0 +1,43 @@
+#pragma once
+
+#if USE_AWS_S3
+
+# include
+# include
+# include
+
+namespace DB
+{
+
+class WriteBuffer;
+class ReadBuffer;
+
+/** The basic parameters of S3Queue table engine for saving in ZooKeeper.
+ * Lets you verify that they match local ones.
+ */
+struct S3QueueTableMetadata
+{
+ String format_name;
+ String after_processing;
+ String mode;
+ UInt64 s3queue_tracked_files_limit;
+ UInt64 s3queue_tracked_file_ttl_sec;
+
+ S3QueueTableMetadata() = default;
+ S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings);
+
+ void read(const String & metadata_str);
+ static S3QueueTableMetadata parse(const String & metadata_str);
+
+ String toString() const;
+
+ void checkEquals(const S3QueueTableMetadata & from_zk) const;
+
+private:
+ void checkImmutableFieldsEquals(const S3QueueTableMetadata & from_zk) const;
+};
+
+
+}
+
+#endif
diff --git a/src/Storages/S3Queue/StorageS3Queue.cpp b/src/Storages/S3Queue/StorageS3Queue.cpp
new file mode 100644
index 00000000000..87bff398172
--- /dev/null
+++ b/src/Storages/S3Queue/StorageS3Queue.cpp
@@ -0,0 +1,711 @@
+#include "config.h"
+
+
+#if USE_AWS_S3
+
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include "IO/ParallelReadBuffer.h"
+
+# include
+
+# include
+
+# include
+
+# include
+# include
+
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+
+
+# include
+
+# include
+# include
+# include
+
+# include
+
+# include
+
+# include
+
+# include
+# include
+# include
+# include
+
+namespace fs = std::filesystem;
+
+namespace ProfileEvents
+{
+extern const Event S3DeleteObjects;
+extern const Event S3ListObjects;
+}
+
+namespace DB
+{
+
+static const String PARTITION_ID_WILDCARD = "{_partition_id}";
+static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
+
+namespace ErrorCodes
+{
+ extern const int LOGICAL_ERROR;
+ extern const int BAD_ARGUMENTS;
+ extern const int S3_ERROR;
+ extern const int NOT_IMPLEMENTED;
+ extern const int QUERY_NOT_ALLOWED;
+ extern const int NO_ZOOKEEPER;
+ extern const int REPLICA_ALREADY_EXISTS;
+ extern const int INCOMPATIBLE_COLUMNS;
+}
+
+
+StorageS3Queue::StorageS3Queue(
+ std::unique_ptr s3queue_settings_,
+ const StorageS3::Configuration & configuration_,
+ const StorageID & table_id_,
+ const ColumnsDescription & columns_,
+ const ConstraintsDescription & constraints_,
+ const String & comment,
+ ContextPtr context_,
+ std::optional format_settings_,
+ ASTPtr partition_by_)
+ : IStorage(table_id_)
+ , WithContext(context_)
+ , s3queue_settings(std::move(s3queue_settings_))
+ , after_processing(s3queue_settings->after_processing)
+ , configuration{configuration_}
+ , reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
+ , format_settings(format_settings_)
+ , partition_by(partition_by_)
+ , log(&Poco::Logger::get("StorageS3Queue (" + table_id_.table_name + ")"))
+{
+ if (configuration.url.key.ends_with('/'))
+ configuration.url.key += '*';
+
+ if (!withGlobs())
+ throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
+
+ String setting_zk_path = s3queue_settings->keeper_path;
+ if (setting_zk_path.empty())
+ {
+ auto database = DatabaseCatalog::instance().getDatabase(table_id_.database_name);
+ bool is_in_replicated_database = database->getEngineName() == "Replicated";
+
+ auto default_path = getContext()->getSettingsRef().s3queue_default_zookeeper_path.value;
+ String zk_path_prefix;
+
+ if (!default_path.empty())
+ {
+ zk_path_prefix = default_path;
+ }
+ else if (is_in_replicated_database)
+ {
+ LOG_INFO(log, "S3Queue engine zookeeper path is not specified. "
+ "Using replicated database zookeeper path");
+
+ zk_path_prefix = fs::path(assert_cast(database.get())->getZooKeeperPath()) / "s3queue";
+ }
+ else
+ {
+ throw Exception(ErrorCodes::NO_ZOOKEEPER,
+ "S3Queue keeper_path engine setting not specified, "
+ "s3queue_default_zookeeper_path_prefix not specified");
+ }
+
+ zk_path = zkutil::extractZooKeeperPath(
+ fs::path(zk_path_prefix) / toString(table_id_.uuid), /* check_starts_with_slash */ true, log);
+ }
+ else
+ {
+ /// We do not add table uuid here on purpose.
+ zk_path = zkutil::extractZooKeeperPath(s3queue_settings->keeper_path.value, /* check_starts_with_slash */ true, log);
+ }
+
+ LOG_INFO(log, "Using zookeeper path: {}", zk_path);
+
+ FormatFactory::instance().checkFormatName(configuration.format);
+ context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.url.uri);
+ StorageInMemoryMetadata storage_metadata;
+ configuration.update(context_);
+
+ if (columns_.empty())
+ {
+ auto columns = StorageS3::getTableStructureFromDataImpl(configuration, format_settings, context_);
+ storage_metadata.setColumns(columns);
+ }
+ else
+ storage_metadata.setColumns(columns_);
+
+ storage_metadata.setConstraints(constraints_);
+ storage_metadata.setComment(comment);
+ setInMemoryMetadata(storage_metadata);
+
+ auto metadata_snapshot = getInMemoryMetadataPtr();
+ const bool is_first_replica = createTableIfNotExists(metadata_snapshot);
+
+ if (!is_first_replica)
+ {
+ checkTableStructure(zk_path, metadata_snapshot);
+ }
+
+ files_metadata = std::make_shared(this, *s3queue_settings);
+
+ auto default_virtuals = NamesAndTypesList{
+ {"_path", std::make_shared(std::make_shared())},
+ {"_file", std::make_shared(std::make_shared())}};
+
+ auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
+ virtual_columns = getVirtualsForStorage(columns, default_virtuals);
+ for (const auto & column : virtual_columns)
+ virtual_block.insert({column.type->createColumn(), column.type, column.name});
+
+ auto poll_thread = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
+ task = std::make_shared(std::move(poll_thread));
+}
+
+
+bool StorageS3Queue::supportsSubcolumns() const
+{
+ return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format);
+}
+
+bool StorageS3Queue::supportsSubsetOfColumns() const
+{
+ return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
+}
+
+Pipe StorageS3Queue::read(
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr local_context,
+ QueryProcessingStage::Enum /*processed_stage*/,
+ size_t max_block_size,
+ size_t /* num_streams */)
+{
+ if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
+ throw Exception(
+ ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
+
+ if (mv_attached)
+ throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageS3Queue with attached materialized views");
+
+ auto query_configuration = updateConfigurationAndGetCopy(local_context);
+
+ Pipes pipes;
+
+ std::unordered_set column_names_set(column_names.begin(), column_names.end());
+ std::vector requested_virtual_columns;
+
+ for (const auto & virtual_column : getVirtuals())
+ {
+ if (column_names_set.contains(virtual_column.name))
+ requested_virtual_columns.push_back(virtual_column);
+ }
+
+ std::shared_ptr iterator_wrapper = createFileIterator(local_context, query_info.query);
+
+ ColumnsDescription columns_description;
+ Block block_for_format;
+ if (supportsSubsetOfColumns())
+ {
+ auto fetch_columns = column_names;
+ const auto & virtuals = getVirtuals();
+ std::erase_if(
+ fetch_columns,
+ [&](const String & col)
+ {
+ return std::any_of(
+ virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
+ });
+
+ if (fetch_columns.empty())
+ fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
+
+ columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
+ block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
+ }
+ else
+ {
+ columns_description = storage_snapshot->metadata->getColumns();
+ block_for_format = storage_snapshot->metadata->getSampleBlock();
+ }
+
+ const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
+
+ return Pipe(std::make_shared(
+ requested_virtual_columns,
+ configuration.format,
+ getName(),
+ block_for_format,
+ local_context,
+ format_settings,
+ columns_description,
+ max_block_size,
+ query_configuration.request_settings,
+ configuration.compression_method,
+ query_configuration.client,
+ query_configuration.url.bucket,
+ query_configuration.url.version_id,
+ iterator_wrapper,
+ files_metadata,
+ after_processing,
+ max_download_threads));
+}
+
+SinkToStoragePtr StorageS3Queue::write(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, bool)
+{
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Write is not supported by storage {}", getName());
+}
+
+void StorageS3Queue::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
+{
+ throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName());
+}
+
+NamesAndTypesList StorageS3Queue::getVirtuals() const
+{
+ return virtual_columns;
+}
+
+bool StorageS3Queue::supportsPartitionBy() const
+{
+ return true;
+}
+
+void StorageS3Queue::startup()
+{
+ if (task)
+ task->holder->activateAndSchedule();
+}
+
+void StorageS3Queue::shutdown()
+{
+ shutdown_called = true;
+ if (task)
+ {
+ task->stream_cancelled = true;
+ task->holder->deactivate();
+ }
+}
+
+size_t StorageS3Queue::getTableDependentCount() const
+{
+ auto table_id = getStorageID();
+ // Check if at least one direct dependency is attached
+ return DatabaseCatalog::instance().getDependentViews(table_id).size();
+}
+
+bool StorageS3Queue::hasDependencies(const StorageID & table_id)
+{
+ // Check if all dependencies are attached
+ auto view_ids = DatabaseCatalog::instance().getDependentViews(table_id);
+ LOG_TEST(log, "Number of attached views {} for {}", view_ids.size(), table_id.getNameForLogs());
+
+ if (view_ids.empty())
+ return false;
+
+ // Check the dependencies are ready?
+ for (const auto & view_id : view_ids)
+ {
+ auto view = DatabaseCatalog::instance().tryGetTable(view_id, getContext());
+ if (!view)
+ return false;
+
+ // If it materialized view, check it's target table
+ auto * materialized_view = dynamic_cast(view.get());
+ if (materialized_view && !materialized_view->tryGetTargetTable())
+ return false;
+ }
+
+ return true;
+}
+
+void StorageS3Queue::threadFunc()
+{
+ bool reschedule = true;
+ try
+ {
+ auto table_id = getStorageID();
+
+ auto dependencies_count = getTableDependentCount();
+ if (dependencies_count)
+ {
+ auto start_time = std::chrono::steady_clock::now();
+
+ mv_attached.store(true);
+ // Keep streaming as long as there are attached views and streaming is not cancelled
+ while (!task->stream_cancelled)
+ {
+ if (!hasDependencies(table_id))
+ {
+ /// For this case, we can not wait for watch thread to wake up
+ reschedule = true;
+ break;
+ }
+
+ LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
+ streamToViews();
+
+ auto ts = std::chrono::steady_clock::now();
+ auto duration = std::chrono::duration_cast(ts - start_time);
+ if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
+ {
+ LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
+ reschedule = true;
+ break;
+ }
+
+ reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms;
+ }
+ }
+ }
+ catch (...)
+ {
+ tryLogCurrentException(__PRETTY_FUNCTION__);
+ }
+
+ mv_attached.store(false);
+
+ if (reschedule && !shutdown_called)
+ {
+ LOG_TRACE(log, "Reschedule S3 Queue thread func.");
+ /// Reschedule with backoff.
+ if (reschedule_processing_interval_ms < s3queue_settings->s3queue_polling_max_timeout_ms)
+ reschedule_processing_interval_ms += s3queue_settings->s3queue_polling_backoff_ms;
+ task->holder->scheduleAfter(reschedule_processing_interval_ms);
+ }
+}
+
+
+void StorageS3Queue::streamToViews()
+{
+ auto table_id = getStorageID();
+ auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
+ if (!table)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Engine table {} doesn't exist.", table_id.getNameForLogs());
+
+ auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
+
+ // Create an INSERT query for streaming data
+ auto insert = std::make_shared();
+ insert->table_id = table_id;
+
+ size_t block_size = 100;
+
+ auto s3queue_context = Context::createCopy(getContext());
+ s3queue_context->makeQueryContext();
+ auto query_configuration = updateConfigurationAndGetCopy(s3queue_context);
+
+ // Create a stream for each consumer and join them in a union stream
+ // Only insert into dependent views and expect that input blocks contain virtual columns
+ InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true);
+ auto block_io = interpreter.execute();
+ auto column_names = block_io.pipeline.getHeader().getNames();
+
+ // Create a stream for each consumer and join them in a union stream
+ std::vector requested_virtual_columns;
+
+ for (const auto & virtual_column : getVirtuals())
+ {
+ requested_virtual_columns.push_back(virtual_column);
+ }
+
+ std::shared_ptr iterator_wrapper = createFileIterator(s3queue_context, nullptr);
+ ColumnsDescription columns_description;
+ Block block_for_format;
+ if (supportsSubsetOfColumns())
+ {
+ auto fetch_columns = column_names;
+ const auto & virtuals = getVirtuals();
+ std::erase_if(
+ fetch_columns,
+ [&](const String & col)
+ {
+ return std::any_of(
+ virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
+ });
+
+ if (fetch_columns.empty())
+ fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
+
+ columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
+ block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
+ }
+ else
+ {
+ columns_description = storage_snapshot->metadata->getColumns();
+ block_for_format = storage_snapshot->metadata->getSampleBlock();
+ }
+
+ const size_t max_download_threads = s3queue_context->getSettingsRef().max_download_threads;
+
+ Pipes pipes;
+
+ auto pipe = Pipe(std::make_shared(
+ requested_virtual_columns,
+ configuration.format,
+ getName(),
+ block_for_format,
+ s3queue_context,
+ format_settings,
+ columns_description,
+ block_size,
+ query_configuration.request_settings,
+ configuration.compression_method,
+ query_configuration.client,
+ query_configuration.url.bucket,
+ query_configuration.url.version_id,
+ iterator_wrapper,
+ files_metadata,
+ after_processing,
+ max_download_threads));
+
+
+ std::atomic_size_t rows = 0;
+ {
+ block_io.pipeline.complete(std::move(pipe));
+ block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
+ CompletedPipelineExecutor executor(block_io.pipeline);
+ executor.execute();
+ }
+}
+
+StorageS3Queue::Configuration StorageS3Queue::updateConfigurationAndGetCopy(ContextPtr local_context)
+{
+ configuration.update(local_context);
+ return configuration;
+}
+
+zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const
+{
+ std::lock_guard lock{zk_mutex};
+ if (!zk_client || zk_client->expired())
+ {
+ zk_client = getContext()->getZooKeeper();
+ zk_client->sync(zk_path);
+ }
+ return zk_client;
+}
+
+
+bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot)
+{
+ auto zookeeper = getZooKeeper();
+ zookeeper->createAncestors(zk_path);
+
+ for (size_t i = 0; i < zk_create_table_retries; ++i)
+ {
+ Coordination::Requests ops;
+ bool is_first_replica = true;
+ if (zookeeper->exists(zk_path + "/metadata"))
+ {
+ if (!zookeeper->exists(zk_path + "/processing"))
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
+ LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zk_path);
+ is_first_replica = false;
+ }
+ else
+ {
+ String metadata_str = S3QueueTableMetadata(configuration, *s3queue_settings).toString();
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent));
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processed", "", zkutil::CreateMode::Persistent));
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/failed", "", zkutil::CreateMode::Persistent));
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
+ ops.emplace_back(zkutil::makeCreateRequest(
+ zk_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent));
+
+ ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/metadata", metadata_str, zkutil::CreateMode::Persistent));
+ }
+
+ Coordination::Responses responses;
+ auto code = zookeeper->tryMulti(ops, responses);
+ if (code == Coordination::Error::ZNODEEXISTS)
+ {
+ LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path);
+ continue;
+ }
+ else if (code != Coordination::Error::ZOK)
+ {
+ zkutil::KeeperMultiException::check(code, ops, responses);
+ }
+
+ return is_first_replica;
+ }
+
+ throw Exception(
+ ErrorCodes::REPLICA_ALREADY_EXISTS,
+ "Cannot create table, because it is created concurrently every time or because "
+ "of wrong zk_path or because of logical error");
+}
+
+
+/** Verify that list of columns and table settings match those specified in ZK (/metadata).
+ * If not, throw an exception.
+ */
+void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot)
+{
+ auto zookeeper = getZooKeeper();
+
+ S3QueueTableMetadata old_metadata(configuration, *s3queue_settings);
+
+ Coordination::Stat metadata_stat;
+ String metadata_str = zookeeper->get(fs::path(zookeeper_prefix) / "metadata", &metadata_stat);
+ auto metadata_from_zk = S3QueueTableMetadata::parse(metadata_str);
+ old_metadata.checkEquals(metadata_from_zk);
+
+ Coordination::Stat columns_stat;
+ auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_prefix) / "columns", &columns_stat));
+
+ const ColumnsDescription & old_columns = metadata_snapshot->getColumns();
+ if (columns_from_zk != old_columns)
+ {
+ throw Exception(
+ ErrorCodes::INCOMPATIBLE_COLUMNS,
+ "Table columns structure in ZooKeeper is different from local table structure. Local columns:\n"
+ "{}\nZookeeper columns:\n{}",
+ old_columns.toString(),
+ columns_from_zk.toString());
+ }
+}
+
+
+std::shared_ptr
+StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
+{
+ auto it = std::make_shared(
+ *configuration.client,
+ configuration.url,
+ query,
+ virtual_block,
+ local_context,
+ s3queue_settings->s3queue_polling_size.value,
+ configuration.request_settings);
+
+ auto zookeeper = getZooKeeper();
+ auto lock = files_metadata->acquireLock(zookeeper);
+ S3QueueFilesMetadata::S3FilesCollection files_to_skip = files_metadata->getProcessedFailedAndProcessingFiles();
+
+ Strings files_to_process;
+ if (s3queue_settings->mode == S3QueueMode::UNORDERED)
+ {
+ files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip);
+ }
+ else
+ {
+ String max_processed_file = files_metadata->getMaxProcessedFile();
+ files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip, max_processed_file);
+ }
+
+ LOG_TEST(log, "Found files to process: {}", fmt::join(files_to_process, ", "));
+
+ files_metadata->setFilesProcessing(files_to_process);
+ return it;
+}
+
+void StorageS3Queue::drop()
+{
+ auto zookeeper = getZooKeeper();
+ if (zookeeper->exists(zk_path))
+ zookeeper->removeRecursive(zk_path);
+}
+
+void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
+{
+ factory.registerStorage(
+ name,
+ [](const StorageFactory::Arguments & args)
+ {
+ auto & engine_args = args.engine_args;
+ if (engine_args.empty())
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
+ auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext());
+
+ // Use format settings from global server context + settings from
+ // the SETTINGS clause of the create query. Settings from current
+ // session and user are ignored.
+ std::optional format_settings;
+
+ auto s3queue_settings = std::make_unique();
+ if (args.storage_def->settings)
+ {
+ s3queue_settings->loadFromQuery(*args.storage_def);
+ FormatFactorySettings user_format_settings;
+
+ // Apply changed settings from global context, but ignore the
+ // unknown ones, because we only have the format settings here.
+ const auto & changes = args.getContext()->getSettingsRef().changes();
+ for (const auto & change : changes)
+ {
+ if (user_format_settings.has(change.name))
+ user_format_settings.set(change.name, change.value);
+ else
+ LOG_TRACE(&Poco::Logger::get("StorageS3"), "Remove: {}", change.name);
+ args.storage_def->settings->changes.removeSetting(change.name);
+ }
+
+ for (const auto & change : args.storage_def->settings->changes)
+ {
+ if (user_format_settings.has(change.name))
+ user_format_settings.applyChange(change);
+ }
+ format_settings = getFormatSettings(args.getContext(), user_format_settings);
+ }
+ else
+ {
+ format_settings = getFormatSettings(args.getContext());
+ }
+
+ ASTPtr partition_by;
+ if (args.storage_def->partition_by)
+ partition_by = args.storage_def->partition_by->clone();
+
+ return std::make_shared(
+ std::move(s3queue_settings),
+ std::move(configuration),
+ args.table_id,
+ args.columns,
+ args.constraints,
+ args.comment,
+ args.getContext(),
+ format_settings,
+ partition_by);
+ },
+ {
+ .supports_settings = true,
+ .supports_sort_order = true, // for partition by
+ .supports_schema_inference = true,
+ .source_access_type = AccessType::S3,
+ });
+}
+
+void registerStorageS3Queue(StorageFactory & factory)
+{
+ return registerStorageS3QueueImpl("S3Queue", factory);
+}
+
+}
+
+
+#endif
diff --git a/src/Storages/S3Queue/StorageS3Queue.h b/src/Storages/S3Queue/StorageS3Queue.h
new file mode 100644
index 00000000000..9737d5fcefa
--- /dev/null
+++ b/src/Storages/S3Queue/StorageS3Queue.h
@@ -0,0 +1,146 @@
+#pragma once
+
+#include "config.h"
+
+#if USE_AWS_S3
+
+# include
+
+# include
+# include
+
+# include
+# include
+# include
+# include
+# include
+# include
+
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+# include
+
+namespace Aws::S3
+{
+class Client;
+}
+
+namespace DB
+{
+
+
+class StorageS3Queue : public IStorage, WithContext
+{
+public:
+ using Configuration = typename StorageS3::Configuration;
+
+ StorageS3Queue(
+ std::unique_ptr s3queue_settings_,
+ const Configuration & configuration_,
+ const StorageID & table_id_,
+ const ColumnsDescription & columns_,
+ const ConstraintsDescription & constraints_,
+ const String & comment,
+ ContextPtr context_,
+ std::optional format_settings_,
+ ASTPtr partition_by_ = nullptr);
+
+ String getName() const override { return "S3Queue"; }
+
+ Pipe read(
+ const Names & column_names,
+ const StorageSnapshotPtr & storage_snapshot,
+ SelectQueryInfo & query_info,
+ ContextPtr context,
+ QueryProcessingStage::Enum processed_stage,
+ size_t max_block_size,
+ size_t num_streams) override;
+
+ SinkToStoragePtr write(
+ const ASTPtr & query,
+ const StorageMetadataPtr & metadata_snapshot,
+ ContextPtr context,
+ bool async_insert) override;
+
+ void truncate(
+ const ASTPtr & /*query*/,
+ const StorageMetadataPtr & /*metadata_snapshot*/,
+ ContextPtr /*local_context*/,
+ TableExclusiveLockHolder &) override;
+
+ NamesAndTypesList getVirtuals() const override;
+
+ bool supportsPartitionBy() const override;
+
+ const auto & getFormatName() const { return configuration.format; }
+
+ const String & getZooKeeperPath() const { return zk_path; }
+
+ zkutil::ZooKeeperPtr getZooKeeper() const;
+
+private:
+ const std::unique_ptr s3queue_settings;
+ const S3QueueAction after_processing;
+
+ std::shared_ptr files_metadata;
+ Configuration configuration;
+ NamesAndTypesList virtual_columns;
+ Block virtual_block;
+ UInt64 reschedule_processing_interval_ms;
+
+ std::optional format_settings;
+ ASTPtr partition_by;
+
+ String zk_path;
+ mutable zkutil::ZooKeeperPtr zk_client;
+ mutable std::mutex zk_mutex;
+
+ std::atomic mv_attached = false;
+ std::atomic shutdown_called{false};
+ Poco::Logger * log;
+
+ bool supportsSubcolumns() const override;
+ bool withGlobs() const { return configuration.url.key.find_first_of("*?{") != std::string::npos; }
+
+ void threadFunc();
+ size_t getTableDependentCount() const;
+ bool hasDependencies(const StorageID & table_id);
+
+ void startup() override;
+ void shutdown() override;
+ void drop() override;
+
+ struct TaskContext
+ {
+ BackgroundSchedulePool::TaskHolder holder;
+ std::atomic stream_cancelled{false};
+ explicit TaskContext(BackgroundSchedulePool::TaskHolder && task_) : holder(std::move(task_)) { }
+ };
+ std::shared_ptr task;
+
+ bool supportsSubsetOfColumns() const override;
+
+ const UInt32 zk_create_table_retries = 1000;
+ bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
+ void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot);
+
+ using KeysWithInfo = StorageS3QueueSource::KeysWithInfo;
+
+ std::shared_ptr
+ createFileIterator(ContextPtr local_context, ASTPtr query);
+
+ void streamToViews();
+ Configuration updateConfigurationAndGetCopy(ContextPtr local_context);
+};
+
+}
+
+#endif
diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp
index ebce3a7aeca..0b615cd795b 100644
--- a/src/Storages/StorageS3.cpp
+++ b/src/Storages/StorageS3.cpp
@@ -596,7 +596,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
auto pipeline = std::make_unique(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique(*pipeline);
- return ReaderHolder{fs::path(bucket) / key_with_info.key, std::move(read_buf), std::move(input_format), std::move(pipeline), std::move(current_reader)};
+ return ReaderHolder{key_with_info.key, bucket, std::move(read_buf), std::move(input_format), std::move(pipeline), std::move(current_reader)};
}
std::future StorageS3Source::createReaderAsync()
diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h
index d001a86842e..e08c01d0c91 100644
--- a/src/Storages/StorageS3.h
+++ b/src/Storages/StorageS3.h
@@ -141,6 +141,8 @@ public:
Chunk generate() override;
private:
+ friend class StorageS3QueueSource;
+
String name;
String bucket;
String version_id;
@@ -157,12 +159,14 @@ private:
{
public:
ReaderHolder(
- String path_,
+ String key_,
+ String bucket_,
std::unique_ptr read_buf_,
std::shared_ptr input_format_,
std::unique_ptr pipeline_,
std::unique_ptr reader_)
- : path(std::move(path_))
+ : key(std::move(key_))
+ , bucket(std::move(bucket_))
, read_buf(std::move(read_buf_))
, input_format(std::move(input_format_))
, pipeline(std::move(pipeline_))
@@ -187,19 +191,22 @@ private:
pipeline = std::move(other.pipeline);
input_format = std::move(other.input_format);
read_buf = std::move(other.read_buf);
- path = std::move(other.path);
+ key = std::move(other.key);
+ bucket = std::move(other.bucket);
return *this;
}
explicit operator bool() const { return reader != nullptr; }
PullingPipelineExecutor * operator->() { return reader.get(); }
const PullingPipelineExecutor * operator->() const { return reader.get(); }
- const String & getPath() const { return path; }
+ String getPath() const { return fs::path(bucket) / key; }
+ const String & getFile() const { return key; }
const IInputFormat * getInputFormat() const { return input_format.get(); }
private:
- String path;
+ String key;
+ String bucket;
std::unique_ptr read_buf;
std::shared_ptr input_format;
std::unique_ptr pipeline;
@@ -323,6 +330,7 @@ protected:
private:
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
+ friend class StorageS3Queue;
Configuration configuration;
std::mutex configuration_update_mutex;
diff --git a/src/Storages/registerStorages.cpp b/src/Storages/registerStorages.cpp
index a4f1d963704..e5b1c8e8744 100644
--- a/src/Storages/registerStorages.cpp
+++ b/src/Storages/registerStorages.cpp
@@ -34,6 +34,8 @@ void registerStorageS3(StorageFactory & factory);
void registerStorageCOS(StorageFactory & factory);
void registerStorageOSS(StorageFactory & factory);
void registerStorageHudi(StorageFactory & factory);
+void registerStorageS3Queue(StorageFactory & factory);
+
#if USE_PARQUET
void registerStorageDeltaLake(StorageFactory & factory);
#endif
@@ -133,6 +135,7 @@ void registerStorages()
registerStorageCOS(factory);
registerStorageOSS(factory);
registerStorageHudi(factory);
+ registerStorageS3Queue(factory);
#if USE_PARQUET
registerStorageDeltaLake(factory);
diff --git a/tests/integration/test_storage_s3_queue/__init__.py b/tests/integration/test_storage_s3_queue/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/tests/integration/test_storage_s3_queue/configs/defaultS3.xml b/tests/integration/test_storage_s3_queue/configs/defaultS3.xml
new file mode 100644
index 00000000000..7dac6d9fbb5
--- /dev/null
+++ b/tests/integration/test_storage_s3_queue/configs/defaultS3.xml
@@ -0,0 +1,11 @@
+
+
+
+ http://resolver:8080
+ Authorization: Bearer TOKEN
+
+
+ http://resolver:8080/root-with-auth/restricteddirectory/
+
+
+
diff --git a/tests/integration/test_storage_s3_queue/configs/named_collections.xml b/tests/integration/test_storage_s3_queue/configs/named_collections.xml
new file mode 100644
index 00000000000..64674e2a3e3
--- /dev/null
+++ b/tests/integration/test_storage_s3_queue/configs/named_collections.xml
@@ -0,0 +1,43 @@
+
+
+
+ http://minio1:9001/root/test_table
+ minio
+ minio123
+
+
+ http://minio1:9001/root/test_parquet
+ minio
+ minio123
+
+
+ http://minio1:9001/root/test_parquet_gz
+ minio
+ minio123
+
+
+ http://minio1:9001/root/test_orc
+ minio
+ minio123
+
+
+ http://minio1:9001/root/test_native
+ minio
+ minio123
+
+
+ http://minio1:9001/root/test.arrow
+ minio
+ minio123
+
+
+ http://minio1:9001/root/test.parquet
+ minio
+ minio123
+
+
+ http://minio1:9001/root/test_cache4.jsonl
+ true
+
+
+
diff --git a/tests/integration/test_storage_s3_queue/configs/users.xml b/tests/integration/test_storage_s3_queue/configs/users.xml
new file mode 100644
index 00000000000..2cef0a6de3c
--- /dev/null
+++ b/tests/integration/test_storage_s3_queue/configs/users.xml
@@ -0,0 +1,7 @@
+
+
+
+ 1
+
+
+
diff --git a/tests/integration/test_storage_s3_queue/test.py b/tests/integration/test_storage_s3_queue/test.py
new file mode 100644
index 00000000000..77e32e2922c
--- /dev/null
+++ b/tests/integration/test_storage_s3_queue/test.py
@@ -0,0 +1,873 @@
+import io
+import logging
+import os
+import random
+import time
+
+import pytest
+from helpers.client import QueryRuntimeException
+from helpers.cluster import ClickHouseCluster, ClickHouseInstance
+import json
+
+"""
+export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-server
+export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-client
+export CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-odbc-bridge
+export CLICKHOUSE_TESTS_BASE_CONFIG_DIR=/home/sergey/vkr/ClickHouse/programs/server
+
+"""
+
+
+def prepare_s3_bucket(started_cluster):
+ # Allows read-write access for bucket without authorization.
+ bucket_read_write_policy = {
+ "Version": "2012-10-17",
+ "Statement": [
+ {
+ "Sid": "",
+ "Effect": "Allow",
+ "Principal": {"AWS": "*"},
+ "Action": "s3:GetBucketLocation",
+ "Resource": "arn:aws:s3:::root",
+ },
+ {
+ "Sid": "",
+ "Effect": "Allow",
+ "Principal": {"AWS": "*"},
+ "Action": "s3:ListBucket",
+ "Resource": "arn:aws:s3:::root",
+ },
+ {
+ "Sid": "",
+ "Effect": "Allow",
+ "Principal": {"AWS": "*"},
+ "Action": "s3:GetObject",
+ "Resource": "arn:aws:s3:::root/*",
+ },
+ {
+ "Sid": "",
+ "Effect": "Allow",
+ "Principal": {"AWS": "*"},
+ "Action": "s3:PutObject",
+ "Resource": "arn:aws:s3:::root/*",
+ },
+ {
+ "Sid": "",
+ "Effect": "Allow",
+ "Principal": {"AWS": "*"},
+ "Action": "s3:DeleteObject",
+ "Resource": "arn:aws:s3:::root/*",
+ },
+ ],
+ }
+
+ minio_client = started_cluster.minio_client
+ minio_client.set_bucket_policy(
+ started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
+ )
+
+ started_cluster.minio_restricted_bucket = "{}-with-auth".format(
+ started_cluster.minio_bucket
+ )
+ if minio_client.bucket_exists(started_cluster.minio_restricted_bucket):
+ minio_client.remove_bucket(started_cluster.minio_restricted_bucket)
+
+ minio_client.make_bucket(started_cluster.minio_restricted_bucket)
+
+
+@pytest.fixture(autouse=True)
+def s3_queue_setup_teardown(started_cluster):
+ instance = started_cluster.instances["instance"]
+ instance_2 = started_cluster.instances["instance2"]
+
+ instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
+ instance_2.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
+
+ minio = started_cluster.minio_client
+ objects = list(
+ minio.list_objects(started_cluster.minio_restricted_bucket, recursive=True)
+ )
+ for obj in objects:
+ minio.remove_object(started_cluster.minio_restricted_bucket, obj.object_name)
+ yield # run test
+
+
+MINIO_INTERNAL_PORT = 9001
+AVAILABLE_MODES = ["unordered", "ordered"]
+AUTH = "'minio','minio123',"
+
+SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
+
+
+def put_s3_file_content(started_cluster, bucket, filename, data):
+ buf = io.BytesIO(data)
+ started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
+
+
+def generate_random_files(
+ count, prefix, cluster, bucket, column_num=3, row_num=10, start_ind=0
+):
+ total_values = []
+ to_generate = [
+ (f"{prefix}/test_{i}.csv", i) for i in range(start_ind, start_ind + count)
+ ]
+ to_generate.sort(key=lambda x: x[0])
+
+ for filename, i in to_generate:
+ rand_values = [
+ [random.randint(0, 50) for _ in range(column_num)] for _ in range(row_num)
+ ]
+ total_values += rand_values
+ values_csv = (
+ "\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
+ ).encode()
+ put_s3_file_content(cluster, bucket, filename, values_csv)
+ return total_values
+
+
+# Returns content of given S3 file as string.
+def get_s3_file_content(started_cluster, bucket, filename, decode=True):
+ # type: (ClickHouseCluster, str, str, bool) -> str
+
+ data = started_cluster.minio_client.get_object(bucket, filename)
+ data_str = b""
+ for chunk in data.stream():
+ data_str += chunk
+ if decode:
+ return data_str.decode()
+ return data_str
+
+
+@pytest.fixture(scope="module")
+def started_cluster():
+ try:
+ cluster = ClickHouseCluster(__file__)
+ cluster.add_instance(
+ "instance",
+ user_configs=["configs/users.xml"],
+ with_minio=True,
+ with_zookeeper=True,
+ main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"],
+ )
+ cluster.add_instance(
+ "instance2",
+ user_configs=["configs/users.xml"],
+ with_minio=True,
+ with_zookeeper=True,
+ main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"],
+ )
+
+ logging.info("Starting cluster...")
+ cluster.start()
+ logging.info("Cluster started")
+
+ prepare_s3_bucket(cluster)
+ yield cluster
+ finally:
+ cluster.shutdown()
+
+
+def run_query(instance, query, stdin=None, settings=None):
+ # type: (ClickHouseInstance, str, object, dict) -> str
+
+ logging.info("Running query '{}'...".format(query))
+ result = instance.query(query, stdin=stdin, settings=settings)
+ logging.info("Query finished")
+
+ return result
+
+
+@pytest.mark.parametrize("mode", AVAILABLE_MODES)
+def test_delete_after_processing(started_cluster, mode):
+ prefix = "delete"
+ bucket = started_cluster.minio_bucket
+ instance = started_cluster.instances["instance"]
+ table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
+
+ total_values = generate_random_files(5, prefix, started_cluster, bucket)
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS test.s3_queue;
+ CREATE TABLE test.s3_queue ({table_format})
+ ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path = '/clickhouse/test_delete_{mode}',
+ s3queue_loading_retries = 3,
+ after_processing='delete';
+ """
+ )
+
+ get_query = f"SELECT * FROM test.s3_queue"
+ assert [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ] == total_values
+ minio = started_cluster.minio_client
+ objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
+ assert len(objects) == 0
+
+
+@pytest.mark.parametrize("mode", AVAILABLE_MODES)
+def test_failed_retry(started_cluster, mode):
+ bucket = started_cluster.minio_restricted_bucket
+ instance = started_cluster.instances["instance"]
+ table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
+
+ values = [
+ ["failed", 1, 1],
+ ]
+ values_csv = (
+ "\n".join((",".join(map(str, row)) for row in values)) + "\n"
+ ).encode()
+ filename = f"test.csv"
+ put_s3_file_content(started_cluster, bucket, filename, values_csv)
+
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS test.s3_queue;
+ CREATE TABLE test.s3_queue ({table_format})
+ ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path = '/clickhouse/select_failed_retry_{mode}',
+ s3queue_loading_retries = 3;
+ """
+ )
+
+ # first try
+ get_query = f"SELECT * FROM test.s3_queue"
+ assert [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ] == []
+ # second try
+ assert [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ] == []
+ # upload correct file
+ values = [
+ [1, 1, 1],
+ ]
+ values_csv = (
+ "\n".join((",".join(map(str, row)) for row in values)) + "\n"
+ ).encode()
+ put_s3_file_content(started_cluster, bucket, filename, values_csv)
+
+ assert [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ] == values
+
+ assert [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ] == []
+
+
+@pytest.mark.parametrize("mode", AVAILABLE_MODES)
+def test_direct_select_file(started_cluster, mode):
+ auth = "'minio','minio123',"
+ bucket = started_cluster.minio_restricted_bucket
+ instance = started_cluster.instances["instance"]
+ table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
+ values = [
+ [12549, 2463, 19893],
+ [64021, 38652, 66703],
+ [81611, 39650, 83516],
+ ]
+ values_csv = (
+ "\n".join((",".join(map(str, row)) for row in values)) + "\n"
+ ).encode()
+ filename = f"test.csv"
+ put_s3_file_content(started_cluster, bucket, filename, values_csv)
+ instance.query(
+ """
+ DROP TABLE IF EXISTS test.s3_queue;
+ DROP TABLE IF EXISTS test.s3_queue_2;
+ DROP TABLE IF EXISTS test.s3_queue_3;
+ """
+ )
+
+ instance.query(
+ f"""
+ CREATE TABLE test.s3_queue ({table_format})
+ ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path = '/clickhouse/select_{mode}'
+ """
+ )
+
+ get_query = f"SELECT * FROM test.s3_queue"
+ assert [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ] == values
+
+ instance.query(
+ f"""
+ CREATE TABLE test.s3_queue_2 ({table_format})
+ ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path = '/clickhouse/select_{mode}'
+ """
+ )
+
+ get_query = f"SELECT * FROM test.s3_queue"
+ assert [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ] == []
+ # New table with same zookeeper path
+ get_query = f"SELECT * FROM test.s3_queue_2"
+ assert [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ] == []
+ # New table with different zookeeper path
+ instance.query(
+ f"""
+ CREATE TABLE test.s3_queue_3 ({table_format})
+ ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path='/clickhouse/select_{mode}_2'
+ """
+ )
+ get_query = f"SELECT * FROM test.s3_queue_3"
+ assert [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ] == values
+
+ values = [
+ [1, 1, 1],
+ ]
+ values_csv = (
+ "\n".join((",".join(map(str, row)) for row in values)) + "\n"
+ ).encode()
+ filename = f"t.csv"
+ put_s3_file_content(started_cluster, bucket, filename, values_csv)
+
+ get_query = f"SELECT * FROM test.s3_queue_3"
+ if mode == "unordered":
+ assert [
+ list(map(int, l.split()))
+ for l in run_query(instance, get_query).splitlines()
+ ] == values
+ elif mode == "ordered":
+ assert [
+ list(map(int, l.split()))
+ for l in run_query(instance, get_query).splitlines()
+ ] == []
+
+
+@pytest.mark.parametrize("mode", AVAILABLE_MODES)
+def test_direct_select_multiple_files(started_cluster, mode):
+ prefix = f"multiple_files_{mode}"
+ bucket = started_cluster.minio_restricted_bucket
+ instance = started_cluster.instances["instance"]
+ table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
+ instance.query("drop table if exists test.s3_queue")
+ instance.query(
+ f"""
+ CREATE TABLE test.s3_queue ({table_format})
+ ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path = '/clickhouse/select_multiple_{mode}'
+ """
+ )
+
+ for i in range(5):
+ rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
+
+ values_csv = (
+ "\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
+ ).encode()
+ filename = f"{prefix}/test_{i}.csv"
+ put_s3_file_content(started_cluster, bucket, filename, values_csv)
+
+ get_query = f"SELECT * FROM test.s3_queue"
+ assert [
+ list(map(int, l.split()))
+ for l in run_query(instance, get_query).splitlines()
+ ] == rand_values
+
+ total_values = generate_random_files(
+ 4, prefix, started_cluster, bucket, start_ind=5
+ )
+ get_query = f"SELECT * FROM test.s3_queue"
+ assert {
+ tuple(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ } == set([tuple(i) for i in total_values])
+
+
+@pytest.mark.parametrize("mode", AVAILABLE_MODES)
+def test_streaming_to_view_(started_cluster, mode):
+ prefix = f"streaming_files_{mode}"
+ bucket = started_cluster.minio_restricted_bucket
+ instance = started_cluster.instances["instance"]
+ table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
+
+ total_values = generate_random_files(10, prefix, started_cluster, bucket)
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS test.s3_queue_persistent;
+ DROP TABLE IF EXISTS test.s3_queue;
+ DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
+
+ CREATE TABLE test.s3_queue_persistent ({table_format})
+ ENGINE = MergeTree()
+ ORDER BY column1;
+
+ CREATE TABLE test.s3_queue ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path = '/clickhouse/view_{mode}';
+
+ CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
+ SELECT
+ *
+ FROM test.s3_queue;
+ """
+ )
+ expected_values = set([tuple(i) for i in total_values])
+ for i in range(10):
+ get_query = f"SELECT * FROM test.persistent_s3_queue_mv"
+
+ selected_values = {
+ tuple(map(int, l.split()))
+ for l in run_query(instance, get_query).splitlines()
+ }
+ if selected_values != expected_values:
+ time.sleep(1)
+ else:
+ break
+
+ assert selected_values == expected_values
+
+
+@pytest.mark.parametrize("mode", AVAILABLE_MODES)
+def test_streaming_to_many_views(started_cluster, mode):
+ prefix = f"streaming_files_{mode}"
+ bucket = started_cluster.minio_restricted_bucket
+ instance = started_cluster.instances["instance"]
+ table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
+ retry_cnt = 10
+
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS test.s3_queue_persistent;
+ DROP TABLE IF EXISTS test.s3_queue_persistent_2;
+ DROP TABLE IF EXISTS test.s3_queue_persistent_3;
+ DROP TABLE IF EXISTS test.s3_queue;
+ DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
+ DROP TABLE IF EXISTS test.persistent_s3_queue_mv_2;
+ DROP TABLE IF EXISTS test.persistent_s3_queue_mv_3;
+
+
+ CREATE TABLE test.s3_queue_persistent ({table_format})
+ ENGINE = MergeTree()
+ ORDER BY column1;
+
+ CREATE TABLE test.s3_queue_persistent_2 ({table_format})
+ ENGINE = MergeTree()
+ ORDER BY column1;
+
+ CREATE TABLE test.s3_queue_persistent_3 ({table_format})
+ ENGINE = MergeTree()
+ ORDER BY column1;
+
+ CREATE TABLE test.s3_queue ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path = '/clickhouse/multiple_view_{mode}';
+
+ CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
+ SELECT
+ *
+ FROM test.s3_queue;
+
+ CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_2 TO test.s3_queue_persistent_2 AS
+ SELECT
+ *
+ FROM test.s3_queue;
+
+ CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_3 TO test.s3_queue_persistent_3 AS
+ SELECT
+ *
+ FROM test.s3_queue;
+ """
+ )
+ total_values = generate_random_files(5, prefix, started_cluster, bucket)
+ expected_values = set([tuple(i) for i in total_values])
+
+ for i in range(retry_cnt):
+ retry = False
+ for get_query in [
+ f"SELECT * FROM test.s3_queue_persistent",
+ f"SELECT * FROM test.s3_queue_persistent_2",
+ f"SELECT * FROM test.s3_queue_persistent_3",
+ ]:
+ selected_values = {
+ tuple(map(int, l.split()))
+ for l in run_query(instance, get_query).splitlines()
+ }
+ if i == retry_cnt - 1:
+ assert selected_values == expected_values
+ if selected_values != expected_values:
+ retry = True
+ break
+ if retry:
+ time.sleep(1)
+ else:
+ break
+
+
+def test_multiple_tables_meta_mismatch(started_cluster):
+ prefix = f"test_meta"
+ bucket = started_cluster.minio_restricted_bucket
+ instance = started_cluster.instances["instance"]
+ table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
+
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS test.s3_queue;
+
+ CREATE TABLE test.s3_queue ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = 'ordered',
+ keeper_path = '/clickhouse/test_meta';
+ """
+ )
+ # check mode
+ failed = False
+ try:
+ instance.query(
+ f"""
+ CREATE TABLE test.s3_queue_copy ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = 'unordered',
+ keeper_path = '/clickhouse/test_meta';
+ """
+ )
+ except QueryRuntimeException as e:
+ assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
+ failed = True
+ assert failed is True
+
+ # check columns
+ table_format_copy = table_format + ", column4 UInt32"
+ try:
+ instance.query(
+ f"""
+ CREATE TABLE test.s3_queue_copy ({table_format_copy})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = 'ordered',
+ keeper_path = '/clickhouse/test_meta';
+ """
+ )
+ except QueryRuntimeException as e:
+ assert (
+ "Table columns structure in ZooKeeper is different from local table structure"
+ in str(e)
+ )
+ failed = True
+
+ assert failed is True
+
+ # check format
+ try:
+ instance.query(
+ f"""
+ CREATE TABLE test.s3_queue_copy ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'TSV')
+ SETTINGS
+ mode = 'ordered',
+ keeper_path = '/clickhouse/test_meta';
+ """
+ )
+ except QueryRuntimeException as e:
+ assert "Existing table metadata in ZooKeeper differs in format name" in str(e)
+ failed = True
+ assert failed is True
+
+ # create working engine
+ instance.query(
+ f"""
+ CREATE TABLE test.s3_queue_copy ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = 'ordered',
+ keeper_path = '/clickhouse/test_meta';
+ """
+ )
+
+
+def test_max_set_age(started_cluster):
+ files_to_generate = 10
+ max_age = 1
+ prefix = f"test_multiple"
+ bucket = started_cluster.minio_restricted_bucket
+ instance = started_cluster.instances["instance"]
+ table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
+
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS test.s3_queue;
+
+ CREATE TABLE test.s3_queue ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = 'unordered',
+ keeper_path = '/clickhouse/test_set_age',
+ s3queue_tracked_files_limit = 10,
+ s3queue_tracked_file_ttl_sec = {max_age};
+ """
+ )
+
+ total_values = generate_random_files(
+ files_to_generate, prefix, started_cluster, bucket, row_num=1
+ )
+ get_query = f"SELECT * FROM test.s3_queue"
+ res1 = [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ]
+ assert res1 == total_values
+ time.sleep(max_age + 1)
+
+ get_query = f"SELECT * FROM test.s3_queue"
+ res1 = [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ]
+ assert res1 == total_values
+
+
+@pytest.mark.parametrize("mode", AVAILABLE_MODES)
+def test_multiple_tables_streaming_sync(started_cluster, mode):
+ files_to_generate = 300
+ poll_size = 30
+ prefix = f"test_multiple_{mode}"
+ bucket = started_cluster.minio_restricted_bucket
+ instance = started_cluster.instances["instance"]
+ table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
+
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS test.s3_queue;
+ DROP TABLE IF EXISTS test.s3_queue_copy;
+ DROP TABLE IF EXISTS test.s3_queue_copy_2;
+
+ DROP TABLE IF EXISTS test.s3_queue_persistent;
+ DROP TABLE IF EXISTS test.s3_queue_persistent_copy;
+ DROP TABLE IF EXISTS test.s3_queue_persistent_copy_2;
+
+ DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
+ DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy;
+ DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy_2;
+
+ CREATE TABLE test.s3_queue ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
+ s3queue_polling_size = {poll_size};
+
+ CREATE TABLE test.s3_queue_copy ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
+ s3queue_polling_size = {poll_size};
+
+ CREATE TABLE test.s3_queue_copy_2 ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
+ s3queue_polling_size = {poll_size};
+
+ CREATE TABLE test.s3_queue_persistent ({table_format})
+ ENGINE = MergeTree()
+ ORDER BY column1;
+
+ CREATE TABLE test.s3_queue_persistent_copy ({table_format})
+ ENGINE = MergeTree()
+ ORDER BY column1;
+
+ CREATE TABLE test.s3_queue_persistent_copy_2 ({table_format})
+ ENGINE = MergeTree()
+ ORDER BY column1;
+
+ CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
+ SELECT
+ *
+ FROM test.s3_queue;
+
+ CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy TO test.s3_queue_persistent_copy AS
+ SELECT
+ *
+ FROM test.s3_queue_copy;
+
+ CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy_2 TO test.s3_queue_persistent_copy_2 AS
+ SELECT
+ *
+ FROM test.s3_queue_copy_2;
+ """
+ )
+ total_values = generate_random_files(
+ files_to_generate, prefix, started_cluster, bucket, row_num=1
+ )
+
+ def get_count(table_name):
+ return int(run_query(instance, f"SELECT count() FROM {table_name}"))
+
+ for _ in range(100):
+ if (
+ get_count("test.s3_queue_persistent")
+ + get_count("test.s3_queue_persistent_copy")
+ + get_count("test.s3_queue_persistent_copy_2")
+ ) == files_to_generate:
+ break
+ time.sleep(1)
+
+ get_query = f"SELECT * FROM test.s3_queue_persistent"
+ res1 = [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ]
+ get_query_copy = f"SELECT * FROM test.s3_queue_persistent_copy"
+ res2 = [
+ list(map(int, l.split()))
+ for l in run_query(instance, get_query_copy).splitlines()
+ ]
+ get_query_copy_2 = f"SELECT * FROM test.s3_queue_persistent_copy_2"
+ res3 = [
+ list(map(int, l.split()))
+ for l in run_query(instance, get_query_copy_2).splitlines()
+ ]
+ assert {tuple(v) for v in res1 + res2 + res3} == set(
+ [tuple(i) for i in total_values]
+ )
+
+ # Checking that all files were processed only once
+ time.sleep(10)
+ assert (
+ get_count("test.s3_queue_persistent")
+ + get_count("test.s3_queue_persistent_copy")
+ + get_count("test.s3_queue_persistent_copy_2")
+ ) == files_to_generate
+
+
+@pytest.mark.parametrize("mode", AVAILABLE_MODES)
+def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
+ files_to_generate = 100
+ poll_size = 10
+ prefix = f"test_multiple_{mode}"
+ bucket = started_cluster.minio_restricted_bucket
+ instance = started_cluster.instances["instance"]
+ instance_2 = started_cluster.instances["instance2"]
+
+ table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
+ for inst in [instance, instance_2]:
+ inst.query(
+ f"""
+ DROP TABLE IF EXISTS test.s3_queue;
+ DROP TABLE IF EXISTS test.s3_queue_persistent;
+ DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
+
+ CREATE TABLE test.s3_queue ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = '{mode}',
+ keeper_path = '/clickhouse/test_multiple_consumers_{mode}',
+ s3queue_polling_size = {poll_size};
+
+ CREATE TABLE test.s3_queue_persistent ({table_format})
+ ENGINE = MergeTree()
+ ORDER BY column1;
+
+ CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
+ SELECT
+ *
+ FROM test.s3_queue;
+ """
+ )
+
+ total_values = generate_random_files(
+ files_to_generate, prefix, started_cluster, bucket, row_num=1
+ )
+
+ def get_count(node, table_name):
+ return int(run_query(node, f"SELECT count() FROM {table_name}"))
+
+ for _ in range(100):
+ if (
+ get_count(instance, "test.s3_queue_persistent")
+ + get_count(instance_2, "test.s3_queue_persistent")
+ ) == files_to_generate:
+ break
+ time.sleep(1)
+
+ get_query = f"SELECT * FROM test.s3_queue_persistent"
+ res1 = [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ]
+ res2 = [
+ list(map(int, l.split())) for l in run_query(instance_2, get_query).splitlines()
+ ]
+
+ # Checking that all engines have made progress
+ assert len(res1) > 0
+ assert len(res2) > 0
+
+ assert len(res1) + len(res2) == files_to_generate
+ assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
+
+ # Checking that all files were processed only once
+ time.sleep(10)
+ assert (
+ get_count(instance, "test.s3_queue_persistent")
+ + get_count(instance_2, "test.s3_queue_persistent")
+ ) == files_to_generate
+
+
+def test_max_set_size(started_cluster):
+ files_to_generate = 10
+ prefix = f"test_multiple"
+ bucket = started_cluster.minio_restricted_bucket
+ instance = started_cluster.instances["instance"]
+ table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
+
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS test.s3_queue;
+
+ CREATE TABLE test.s3_queue ({table_format})
+ ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
+ SETTINGS
+ mode = 'unordered',
+ keeper_path = '/clickhouse/test_set_size',
+ s3queue_tracked_files_limit = {files_to_generate - 1};
+ """
+ )
+
+ total_values = generate_random_files(
+ files_to_generate, prefix, started_cluster, bucket, start_ind=0, row_num=1
+ )
+ get_query = f"SELECT * FROM test.s3_queue"
+ res1 = [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ]
+ assert res1 == total_values
+
+ get_query = f"SELECT * FROM test.s3_queue"
+ res1 = [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ]
+ assert res1 == [total_values[0]]
+
+ get_query = f"SELECT * FROM test.s3_queue"
+ res1 = [
+ list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
+ ]
+ assert res1 == [total_values[1]]