Merge pull request #49086 from s-kat/s3queue

Streaming Data Import From S3
This commit is contained in:
Kseniia Sumarokova 2023-08-02 11:32:48 +02:00 committed by GitHub
commit b2d2a295fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 3224 additions and 6 deletions

View File

@ -60,6 +60,7 @@ Engines in the family:
- [EmbeddedRocksDB](../../engines/table-engines/integrations/embedded-rocksdb.md)
- [RabbitMQ](../../engines/table-engines/integrations/rabbitmq.md)
- [PostgreSQL](../../engines/table-engines/integrations/postgresql.md)
- [S3Queue](../../engines/table-engines/integrations/s3queue.md)
### Special Engines {#special-engines}

View File

@ -0,0 +1,224 @@
---
slug: /en/engines/table-engines/integrations/s3queue
sidebar_position: 7
sidebar_label: S3Queue
---
# S3Queue Table Engine
This engine provides integration with [Amazon S3](https://aws.amazon.com/s3/) ecosystem and allows streaming import. This engine is similar to the [Kafka](../../../engines/table-engines/integrations/kafka.md), [RabbitMQ](../../../engines/table-engines/integrations/rabbitmq.md) engines, but provides S3-specific features.
## Create Table {#creating-a-table}
``` sql
CREATE TABLE s3_queue_engine_table (name String, value UInt32)
ENGINE = S3Queue(path [, NOSIGN | aws_access_key_id, aws_secret_access_key,] format, [compression])
[SETTINGS]
[mode = 'unordered',]
[after_processing = 'keep',]
[keeper_path = '',]
[s3queue_loading_retries = 0,]
[s3queue_polling_min_timeout_ms = 1000,]
[s3queue_polling_max_timeout_ms = 10000,]
[s3queue_polling_backoff_ms = 0,]
[s3queue_tracked_files_limit = 1000,]
[s3queue_tracked_file_ttl_sec = 0,]
[s3queue_polling_size = 50,]
```
**Engine parameters**
- `path` — Bucket url with path to file. Supports following wildcards in readonly mode: `*`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings. For more information see [below](#wildcards-in-path).
- `NOSIGN` - If this keyword is provided in place of credentials, all the requests will not be signed.
- `format` — The [format](../../../interfaces/formats.md#formats) of the file.
- `aws_access_key_id`, `aws_secret_access_key` - Long-term credentials for the [AWS](https://aws.amazon.com/) account user. You can use these to authenticate your requests. Parameter is optional. If credentials are not specified, they are used from the configuration file. For more information see [Using S3 for Data Storage](../mergetree-family/mergetree.md#table_engine-mergetree-s3).
- `compression` — Compression type. Supported values: `none`, `gzip/gz`, `brotli/br`, `xz/LZMA`, `zstd/zst`. Parameter is optional. By default, it will autodetect compression by file extension.
**Example**
```sql
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
mode = 'ordred';
```
Using named collections:
``` xml
<clickhouse>
<named_collections>
<s3queue_conf>
<url>'https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*</url>
<access_key_id>test<access_key_id>
<secret_access_key>test</secret_access_key>
</s3queue_conf>
</named_collections>
</clickhouse>
```
```sql
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue(s3queue_conf, format = 'CSV', compression_method = 'gzip')
SETTINGS
mode = 'ordred';
```
## Settings {#s3queue-settings}
### mode {#mode}
Possible values:
- unordered — With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKeeper.
- ordered — With ordered mode, only the max name of the successfully consumed file, and the names of files that will be retried after unsuccessful loading attempt are being stored in ZooKeeper.
Default value: `unordered`.
### after_processing {#after_processing}
Delete or keep file after successful processing.
Possible values:
- keep.
- delete.
Default value: `keep`.
### keeper_path {#keeper_path}
The path in ZooKeeper can be specified as a table engine setting or default path can be formed from the global configuration-provided path and table UUID.
Possible values:
- String.
Default value: `/`.
### s3queue_loading_retries {#s3queue_loading_retries}
Retry file loading up to specified number of times. By default, there are no retries.
Possible values:
- Positive integer.
Default value: `0`.
### s3queue_polling_min_timeout_ms {#s3queue_polling_min_timeout_ms}
Minimal timeout before next polling (in milliseconds).
Possible values:
- Positive integer.
Default value: `1000`.
### s3queue_polling_max_timeout_ms {#s3queue_polling_max_timeout_ms}
Maximum timeout before next polling (in milliseconds).
Possible values:
- Positive integer.
Default value: `10000`.
### s3queue_polling_backoff_ms {#s3queue_polling_backoff_ms}
Polling backoff (in milliseconds).
Possible values:
- Positive integer.
Default value: `0`.
### s3queue_tracked_files_limit {#s3queue_tracked_files_limit}
Allows to limit the number of Zookeeper nodes if the 'unordered' mode is used, does nothing for 'ordered' mode.
If limit reached the oldest processed files will be deleted from ZooKeeper node and processed again.
Possible values:
- Positive integer.
Default value: `1000`.
### s3queue_tracked_file_ttl_sec {#s3queue_tracked_file_ttl_sec}
Maximum number of seconds to store processed files in ZooKeeper node (store forever by default) for 'unordered' mode, does nothing for 'ordered' mode.
After the specified number of seconds, the file will be re-imported.
Possible values:
- Positive integer.
Default value: `0`.
### s3queue_polling_size {#s3queue_polling_size}
Maximum files to fetch from S3 with SELECT or in background task.
Engine takes files for processing from S3 in batches.
We limit the batch size to increase concurrency if multiple table engines with the same `keeper_path` consume files from the same path.
Possible values:
- Positive integer.
Default value: `50`.
## S3-related Settings {#s3-settings}
Engine supports all s3 related settings. For more information about S3 settings see [here](../../../engines/table-engines/integrations/s3.md).
## Description {#description}
`SELECT` is not particularly useful for streaming import (except for debugging), because each file can be imported only once. It is more practical to create real-time threads using [materialized views](../../../sql-reference/statements/create/view.md). To do this:
1. Use the engine to create a table for consuming from specified path in S3 and consider it a data stream.
2. Create a table with the desired structure.
3. Create a materialized view that converts data from the engine and puts it into a previously created table.
When the `MATERIALIZED VIEW` joins the engine, it starts collecting data in the background.
Example:
``` sql
CREATE TABLE s3queue_engine_table (name String, value UInt32)
ENGINE=S3Queue('https://clickhouse-public-datasets.s3.amazonaws.com/my-test-bucket-768/*', 'CSV', 'gzip')
SETTINGS
mode = 'unordred',
keeper_path = '/clickhouse/s3queue/';
CREATE TABLE stats (name String, value UInt32)
ENGINE = MergeTree() ORDER BY name;
CREATE MATERIALIZED VIEW consumer TO stats
AS SELECT name, value FROM s3queue_engine_table;
SELECT * FROM stats ORDER BY name;
```
## Virtual columns {#virtual-columns}
- `_path` — Path to the file.
- `_file` — Name of the file.
For more information about virtual columns see [here](../../../engines/table-engines/index.md#table_engines-virtual_columns).
## Wildcards In Path {#wildcards-in-path}
`path` argument can specify multiple files using bash-like wildcards. For being processed file should exist and match to the whole path pattern. Listing of files is determined during `SELECT` (not at `CREATE` moment).
- `*` — Substitutes any number of any characters except `/` including empty string.
- `?` — Substitutes any single character.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`.
- `{N..M}` — Substitutes any number in range from N to M including both borders. N and M can have leading zeroes e.g. `000..078`.
Constructions with `{}` are similar to the [remote](../../../sql-reference/table-functions/remote.md) table function.
:::note
If the listing of files contains number ranges with leading zeros, use the construction with braces for each digit separately or use `?`.
:::

View File

@ -248,6 +248,7 @@ add_object_library(clickhouse_storages_distributed Storages/Distributed)
add_object_library(clickhouse_storages_mergetree Storages/MergeTree)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_storages_windowview Storages/WindowView)
add_object_library(clickhouse_storages_s3queue Storages/S3Queue)
add_object_library(clickhouse_client Client)
add_object_library(clickhouse_bridge BridgeHelper)
add_object_library(clickhouse_server Server)

View File

@ -104,6 +104,7 @@ class IColumn;
M(UInt64, s3_retry_attempts, 10, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(String, s3queue_default_zookeeper_path, "/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
M(Bool, hdfs_create_new_file_on_insert, false, "Enables or disables creating a new file on each insert in hdfs engine tables", 0) \

View File

@ -175,4 +175,11 @@ IMPLEMENT_SETTING_ENUM(ORCCompression, ErrorCodes::BAD_ARGUMENTS,
{"zlib", FormatSettings::ORCCompression::ZLIB},
{"lz4", FormatSettings::ORCCompression::LZ4}})
IMPLEMENT_SETTING_ENUM(S3QueueMode, ErrorCodes::BAD_ARGUMENTS,
{{"ordered", S3QueueMode::ORDERED},
{"unordered", S3QueueMode::UNORDERED}})
IMPLEMENT_SETTING_ENUM(S3QueueAction, ErrorCodes::BAD_ARGUMENTS,
{{"keep", S3QueueAction::KEEP},
{"delete", S3QueueAction::DELETE}})
}

View File

@ -221,4 +221,21 @@ enum class ParallelReplicasCustomKeyFilterType : uint8_t
DECLARE_SETTING_ENUM(ParallelReplicasCustomKeyFilterType)
DECLARE_SETTING_ENUM(LocalFSReadMethod)
enum class S3QueueMode
{
ORDERED,
UNORDERED,
};
DECLARE_SETTING_ENUM(S3QueueMode)
enum class S3QueueAction
{
KEEP,
DELETE,
};
DECLARE_SETTING_ENUM(S3QueueAction)
}

View File

@ -0,0 +1,351 @@
#include "IO/VarInt.h"
#include "config.h"
#if USE_AWS_S3
# include <algorithm>
# include <IO/Operators.h>
# include <IO/ReadBufferFromString.h>
# include <IO/ReadHelpers.h>
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/S3Queue/StorageS3Queue.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/StorageSnapshot.h>
# include <base/sleep.h>
# include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
}
namespace
{
UInt64 getCurrentTime()
{
return std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
}
void S3QueueFilesMetadata::S3QueueCollection::read(ReadBuffer & in)
{
files = {};
if (in.eof())
return;
size_t files_num;
in >> files_num >> "\n";
while (files_num--)
{
TrackedCollectionItem item;
in >> item.file_path >> "\n";
in >> item.timestamp >> "\n";
in >> item.retries_count >> "\n";
in >> item.last_exception >> "\n";
files.push_back(item);
}
}
void S3QueueFilesMetadata::S3QueueCollection::write(WriteBuffer & out) const
{
out << files.size() << "\n";
for (const auto & processed_file : files)
{
out << processed_file.file_path << "\n";
out << processed_file.timestamp << "\n";
out << processed_file.retries_count << "\n";
out << processed_file.last_exception << "\n";
}
}
String S3QueueFilesMetadata::S3QueueCollection::toString() const
{
WriteBufferFromOwnString out;
write(out);
return out.str();
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueCollection::getFileNames()
{
S3FilesCollection keys = {};
for (const auto & pair : files)
keys.insert(pair.file_path);
return keys;
}
S3QueueFilesMetadata::S3QueueProcessedCollection::S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_)
: max_size(max_size_), max_age(max_age_)
{
}
void S3QueueFilesMetadata::S3QueueProcessedCollection::parse(const String & collection_str)
{
ReadBufferFromString buf(collection_str);
read(buf);
if (max_age > 0) // Remove old items
{
std::erase_if(
files,
[timestamp = getCurrentTime(), this](const TrackedCollectionItem & processed_file)
{ return (timestamp - processed_file.timestamp) > max_age; });
}
}
void S3QueueFilesMetadata::S3QueueProcessedCollection::add(const String & file_name)
{
TrackedCollectionItem processed_file = { .file_path=file_name, .timestamp = getCurrentTime() };
files.push_back(processed_file);
/// TODO: it is strange that in parse() we take into account only max_age, but here only max_size.
while (files.size() > max_size)
{
files.pop_front();
}
}
S3QueueFilesMetadata::S3QueueFailedCollection::S3QueueFailedCollection(const UInt64 & max_retries_count_)
: max_retries_count(max_retries_count_)
{
}
void S3QueueFilesMetadata::S3QueueFailedCollection::parse(const String & collection_str)
{
ReadBufferFromString buf(collection_str);
read(buf);
}
bool S3QueueFilesMetadata::S3QueueFailedCollection::add(const String & file_name, const String & exception_message)
{
auto failed_it = std::find_if(
files.begin(), files.end(),
[&file_name](const TrackedCollectionItem & s) { return s.file_path == file_name; });
if (failed_it == files.end())
{
files.emplace_back(file_name, 0, max_retries_count, exception_message);
}
else if (failed_it->retries_count == 0 || --failed_it->retries_count == 0)
{
return false;
}
return true;
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::S3QueueFailedCollection::getFileNames()
{
S3FilesCollection failed_keys;
for (const auto & pair : files)
{
if (pair.retries_count == 0)
failed_keys.insert(pair.file_path);
}
return failed_keys;
}
void S3QueueFilesMetadata::S3QueueProcessingCollection::parse(const String & collection_str)
{
ReadBufferFromString rb(collection_str);
Strings result;
readQuoted(result, rb);
files = S3FilesCollection(result.begin(), result.end());
}
void S3QueueFilesMetadata::S3QueueProcessingCollection::add(const Strings & file_names)
{
files.insert(file_names.begin(), file_names.end());
}
void S3QueueFilesMetadata::S3QueueProcessingCollection::remove(const String & file_name)
{
files.erase(file_name);
}
String S3QueueFilesMetadata::S3QueueProcessingCollection::toString() const
{
return DB::toString(Strings(files.begin(), files.end()));
}
S3QueueFilesMetadata::S3QueueFilesMetadata(
const StorageS3Queue * storage_,
const S3QueueSettings & settings_)
: storage(storage_)
, mode(settings_.mode)
, max_set_size(settings_.s3queue_tracked_files_limit.value)
, max_set_age_sec(settings_.s3queue_tracked_file_ttl_sec.value)
, max_loading_retries(settings_.s3queue_loading_retries.value)
, zookeeper_processing_path(fs::path(storage->getZooKeeperPath()) / "processing")
, zookeeper_processed_path(fs::path(storage->getZooKeeperPath()) / "processed")
, zookeeper_failed_path(fs::path(storage->getZooKeeperPath()) / "failed")
, zookeeper_lock_path(fs::path(storage->getZooKeeperPath()) / "lock")
, log(&Poco::Logger::get("S3QueueFilesMetadata"))
{
}
void S3QueueFilesMetadata::setFileProcessed(const String & file_path)
{
auto zookeeper = storage->getZooKeeper();
auto lock = acquireLock(zookeeper);
switch (mode)
{
case S3QueueMode::UNORDERED:
{
S3QueueProcessedCollection processed_files(max_set_size, max_set_age_sec);
processed_files.parse(zookeeper->get(zookeeper_processed_path));
processed_files.add(file_path);
zookeeper->set(zookeeper_processed_path, processed_files.toString());
break;
}
case S3QueueMode::ORDERED:
{
// Check that we set in ZooKeeper node only maximum processed file path.
// This check can be useful, when multiple table engines consume in ordered mode.
String max_file = getMaxProcessedFile();
if (max_file.compare(file_path) <= 0)
zookeeper->set(zookeeper_processed_path, file_path);
break;
}
}
removeProcessingFile(file_path);
}
bool S3QueueFilesMetadata::setFileFailed(const String & file_path, const String & exception_message)
{
auto zookeeper = storage->getZooKeeper();
auto lock = acquireLock(zookeeper);
S3QueueFailedCollection failed_collection(max_loading_retries);
failed_collection.parse(zookeeper->get(zookeeper_failed_path));
const bool can_be_retried = failed_collection.add(file_path, exception_message);
zookeeper->set(zookeeper_failed_path, failed_collection.toString());
removeProcessingFile(file_path);
return can_be_retried;
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getFailedFiles()
{
auto zookeeper = storage->getZooKeeper();
String failed_files = zookeeper->get(zookeeper_failed_path);
S3QueueFailedCollection failed_collection(max_loading_retries);
failed_collection.parse(failed_files);
return failed_collection.getFileNames();
}
String S3QueueFilesMetadata::getMaxProcessedFile()
{
auto zookeeper = storage->getZooKeeper();
return zookeeper->get(zookeeper_processed_path);
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessingFiles()
{
auto zookeeper = storage->getZooKeeper();
String processing_files;
if (!zookeeper->tryGet(zookeeper_processing_path, processing_files))
return {};
S3QueueProcessingCollection processing_collection;
if (!processing_files.empty())
processing_collection.parse(processing_files);
return processing_collection.getFileNames();
}
void S3QueueFilesMetadata::setFilesProcessing(const Strings & file_paths)
{
auto zookeeper = storage->getZooKeeper();
String processing_files;
zookeeper->tryGet(zookeeper_processing_path, processing_files);
S3QueueProcessingCollection processing_collection;
if (!processing_files.empty())
processing_collection.parse(processing_files);
processing_collection.add(file_paths);
if (zookeeper->exists(zookeeper_processing_path))
zookeeper->set(zookeeper_processing_path, processing_collection.toString());
else
zookeeper->create(zookeeper_processing_path, processing_collection.toString(), zkutil::CreateMode::Ephemeral);
}
void S3QueueFilesMetadata::removeProcessingFile(const String & file_path)
{
auto zookeeper = storage->getZooKeeper();
String processing_files;
zookeeper->tryGet(zookeeper_processing_path, processing_files);
S3QueueProcessingCollection processing_collection;
processing_collection.parse(processing_files);
processing_collection.remove(file_path);
zookeeper->set(zookeeper_processing_path, processing_collection.toString());
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getUnorderedProcessedFiles()
{
auto zookeeper = storage->getZooKeeper();
S3QueueProcessedCollection processed_collection(max_set_size, max_set_age_sec);
processed_collection.parse(zookeeper->get(zookeeper_processed_path));
return processed_collection.getFileNames();
}
S3QueueFilesMetadata::S3FilesCollection S3QueueFilesMetadata::getProcessedFailedAndProcessingFiles()
{
S3FilesCollection processed_and_failed_files = getFailedFiles();
switch (mode)
{
case S3QueueMode::UNORDERED:
{
processed_and_failed_files.merge(getUnorderedProcessedFiles());
break;
}
case S3QueueMode::ORDERED:
{
processed_and_failed_files.insert(getMaxProcessedFile());
break;
}
}
processed_and_failed_files.merge(getProcessingFiles());
return processed_and_failed_files;
}
std::shared_ptr<zkutil::EphemeralNodeHolder> S3QueueFilesMetadata::acquireLock(zkutil::ZooKeeperPtr zookeeper)
{
UInt32 retry_count = 200;
UInt32 sleep_ms = 100;
UInt32 retries = 0;
while (true)
{
Coordination::Error code = zookeeper->tryCreate(zookeeper_lock_path, "", zkutil::CreateMode::Ephemeral);
if (code == Coordination::Error::ZNONODE || code == Coordination::Error::ZNODEEXISTS)
{
retries++;
if (retries > retry_count)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Can't acquire zookeeper lock");
}
sleepForMilliseconds(sleep_ms);
}
else if (code != Coordination::Error::ZOK)
{
throw Coordination::Exception(code, zookeeper_lock_path);
}
else
{
return zkutil::EphemeralNodeHolder::existing(zookeeper_lock_path, *zookeeper);
}
}
}
}
#endif

View File

@ -0,0 +1,124 @@
#pragma once
#if USE_AWS_S3
# include <Core/UUID.h>
# include <Interpreters/Context.h>
# include <Storages/StorageS3Settings.h>
# include <Common/ZooKeeper/ZooKeeper.h>
namespace DB
{
class StorageS3Queue;
struct S3QueueSettings;
class S3QueueFilesMetadata
{
public:
struct TrackedCollectionItem
{
String file_path;
UInt64 timestamp = 0;
UInt64 retries_count = 0;
String last_exception;
};
using S3FilesCollection = std::unordered_set<String>;
using TrackedFiles = std::deque<TrackedCollectionItem>;
S3QueueFilesMetadata(const StorageS3Queue * storage_, const S3QueueSettings & settings_);
void setFilesProcessing(const Strings & file_paths);
void setFileProcessed(const String & file_path);
bool setFileFailed(const String & file_path, const String & exception_message);
S3FilesCollection getProcessedFailedAndProcessingFiles();
String getMaxProcessedFile();
std::shared_ptr<zkutil::EphemeralNodeHolder> acquireLock(zkutil::ZooKeeperPtr zookeeper);
struct S3QueueCollection
{
public:
virtual ~S3QueueCollection() = default;
virtual String toString() const;
S3FilesCollection getFileNames();
virtual void parse(const String & collection_str) = 0;
protected:
TrackedFiles files;
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;
};
struct S3QueueProcessedCollection : public S3QueueCollection
{
public:
S3QueueProcessedCollection(const UInt64 & max_size_, const UInt64 & max_age_);
void parse(const String & collection_str) override;
void add(const String & file_name);
private:
const UInt64 max_size;
const UInt64 max_age;
};
struct S3QueueFailedCollection : S3QueueCollection
{
public:
S3QueueFailedCollection(const UInt64 & max_retries_count_);
void parse(const String & collection_str) override;
bool add(const String & file_name, const String & exception_message);
S3FilesCollection getFileNames();
private:
UInt64 max_retries_count;
};
struct S3QueueProcessingCollection
{
public:
S3QueueProcessingCollection() = default;
void parse(const String & collection_str);
void add(const Strings & file_names);
void remove(const String & file_name);
String toString() const;
const S3FilesCollection & getFileNames() const { return files; }
private:
S3FilesCollection files;
};
private:
const StorageS3Queue * storage;
const S3QueueMode mode;
const UInt64 max_set_size;
const UInt64 max_set_age_sec;
const UInt64 max_loading_retries;
const String zookeeper_processing_path;
const String zookeeper_processed_path;
const String zookeeper_failed_path;
const String zookeeper_lock_path;
mutable std::mutex mutex;
Poco::Logger * log;
S3FilesCollection getFailedFiles();
S3FilesCollection getProcessingFiles();
S3FilesCollection getUnorderedProcessedFiles();
void removeProcessingFile(const String & file_path);
};
}
#endif

View File

@ -0,0 +1,41 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSetQuery.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(S3QueueSettingsTraits, LIST_OF_S3QUEUE_SETTINGS)
void S3QueueSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
e.addMessage("for storage " + storage_def.engine->name);
throw;
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}

View File

@ -0,0 +1,46 @@
#pragma once
#include <Core/BaseSettings.h>
#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
namespace DB
{
class ASTStorage;
#define S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
M(S3QueueMode, \
mode, \
S3QueueMode::ORDERED, \
"With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer." \
"With ordered mode, only the max name of the successfully consumed file stored.", \
0) \
M(S3QueueAction, after_processing, S3QueueAction::KEEP, "Delete or keep file in S3 after successful processing", 0) \
M(String, keeper_path, "", "Zookeeper node path", 0) \
M(UInt64, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt64, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
M(UInt64, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
M(UInt64, s3queue_polling_backoff_ms, 0, "Polling backoff", 0) \
M(UInt64, s3queue_tracked_files_limit, 1000, "Max set size for tracking processed files in unordered mode in ZooKeeper", 0) \
M(UInt64, \
s3queue_tracked_file_ttl_sec, \
0, \
"Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", \
0) \
M(UInt64, s3queue_polling_size, 50, "Maximum files to fetch from S3 with SELECT", 0)
#define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \
S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
FORMAT_FACTORY_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(S3QueueSettingsTraits, LIST_OF_S3QUEUE_SETTINGS)
struct S3QueueSettings : public BaseSettings<S3QueueSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -0,0 +1,321 @@
#include <algorithm>
#include <Common/ProfileEvents.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include "IO/ParallelReadBuffer.h"
#include "Parsers/ASTCreateQuery.h"
#include "config.h"
#if USE_AWS_S3
# include <Common/isValidUTF8.h>
# include <Functions/FunctionsConversion.h>
# include <IO/S3/Requests.h>
# include <IO/S3Common.h>
# include <Interpreters/TreeRewriter.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ASTInsertQuery.h>
# include <Storages/NamedCollectionsHelpers.h>
# include <Storages/PartitionedSink.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
# include <Storages/VirtualColumnUtils.h>
# include <Storages/getVirtualsForStorage.h>
# include <Formats/FormatFactory.h>
# include <Processors/Formats/IInputFormat.h>
# include <Processors/Formats/IOutputFormat.h>
# include <Processors/Transforms/AddingDefaultsTransform.h>
# include <QueryPipeline/QueryPipelineBuilder.h>
# include <DataTypes/DataTypeString.h>
# include <Common/CurrentMetrics.h>
# include <Common/NamedCollections/NamedCollections.h>
# include <Common/parseGlobs.h>
# include <Processors/ISource.h>
# include <Processors/Sinks/SinkToStorage.h>
namespace CurrentMetrics
{
extern const Metric StorageS3Threads;
extern const Metric StorageS3ThreadsActive;
}
namespace ProfileEvents
{
extern const Event S3DeleteObjects;
extern const Event S3ListObjects;
}
namespace DB
{
namespace ErrorCodes
{
extern const int S3_ERROR;
}
StorageS3QueueSource::QueueGlobIterator::QueueGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_)
: max_poll_size(max_poll_size_)
, glob_iterator(std::make_unique<StorageS3QueueSource::DisclosedGlobIterator>(
client_, globbed_uri_, query, virtual_header, context, nullptr, request_settings_))
{
/// todo(kssenii): remove this loop, it should not be here
while (true)
{
KeyWithInfo val = glob_iterator->next();
if (val.key.empty())
break;
keys_buf.push_back(val);
}
}
Strings StorageS3QueueSource::QueueGlobIterator::filterProcessingFiles(
const S3QueueMode & engine_mode, std::unordered_set<String> & exclude_keys, const String & max_file)
{
for (const KeyWithInfo & val : keys_buf)
{
auto full_path = val.key;
if (exclude_keys.find(full_path) != exclude_keys.end())
{
LOG_TEST(log, "File {} will be skipped, because it was found in exclude files list "
"(either already processed or failed to be processed)", val.key);
continue;
}
if ((engine_mode == S3QueueMode::ORDERED) && (full_path.compare(max_file) <= 0))
continue;
if ((processing_keys.size() < max_poll_size) || (engine_mode == S3QueueMode::ORDERED))
{
processing_keys.push_back(val);
}
else
{
break;
}
}
if (engine_mode == S3QueueMode::ORDERED)
{
std::sort(
processing_keys.begin(),
processing_keys.end(),
[](const KeyWithInfo & lhs, const KeyWithInfo & rhs) { return lhs.key.compare(rhs.key) < 0; });
if (processing_keys.size() > max_poll_size)
{
processing_keys.erase(processing_keys.begin() + max_poll_size, processing_keys.end());
}
}
Strings keys;
for (const auto & key_info : processing_keys)
keys.push_back(key_info.key);
processing_keys.push_back(KeyWithInfo());
processing_iterator = processing_keys.begin();
return keys;
}
StorageS3QueueSource::KeyWithInfo StorageS3QueueSource::QueueGlobIterator::next()
{
std::lock_guard lock(mutex);
if (processing_iterator != processing_keys.end())
{
return *processing_iterator++;
}
return KeyWithInfo();
}
Block StorageS3QueueSource::getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns)
{
for (const auto & virtual_column : requested_virtual_columns)
sample_block.insert({virtual_column.type->createColumn(), virtual_column.type, virtual_column.name});
return sample_block;
}
StorageS3QueueSource::StorageS3QueueSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
const String & format_,
String name_,
const Block & sample_block_,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const S3::Client> & client_,
const String & bucket_,
const String & version_id_,
std::shared_ptr<IIterator> file_iterator_,
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
const S3QueueAction & action_,
const size_t download_thread_num_)
: ISource(getHeader(sample_block_, requested_virtual_columns_))
, WithContext(context_)
, name(std::move(name_))
, bucket(bucket_)
, version_id(version_id_)
, format(format_)
, columns_desc(columns_)
, request_settings(request_settings_)
, client(client_)
, files_metadata(files_metadata_)
, requested_virtual_columns(requested_virtual_columns_)
, file_iterator(file_iterator_)
, action(action_)
{
internal_source = std::make_shared<StorageS3Source>(
requested_virtual_columns_,
format_,
name_,
sample_block_,
context_,
format_settings_,
columns_,
max_block_size_,
request_settings_,
compression_hint_,
client_,
bucket_,
version_id_,
file_iterator,
download_thread_num_);
reader = std::move(internal_source->reader);
if (reader)
reader_future = std::move(internal_source->reader_future);
}
StorageS3QueueSource::~StorageS3QueueSource()
{
internal_source->create_reader_pool.wait();
}
String StorageS3QueueSource::getName() const
{
return name;
}
Chunk StorageS3QueueSource::generate()
{
auto file_progress = getContext()->getFileProgressCallback();
while (true)
{
if (isCancelled() || !reader)
{
if (reader)
reader->cancel();
break;
}
Chunk chunk;
bool success_in_pulling = false;
try
{
if (reader->pull(chunk))
{
UInt64 num_rows = chunk.getNumRows();
auto file_path = reader.getPath();
for (const auto & virtual_column : requested_virtual_columns)
{
if (virtual_column.name == "_path")
{
chunk.addColumn(virtual_column.type->createColumnConst(num_rows, file_path)->convertToFullColumnIfConst());
}
else if (virtual_column.name == "_file")
{
size_t last_slash_pos = file_path.find_last_of('/');
auto column = virtual_column.type->createColumnConst(num_rows, file_path.substr(last_slash_pos + 1));
chunk.addColumn(column->convertToFullColumnIfConst());
}
}
success_in_pulling = true;
}
}
catch (const Exception & e)
{
LOG_ERROR(log, "Exception in chunk pulling: {} ", e.displayText());
files_metadata->setFileFailed(reader.getFile(), e.message());
success_in_pulling = false;
}
if (success_in_pulling)
{
applyActionAfterProcessing(reader.getFile());
files_metadata->setFileProcessed(reader.getFile());
return chunk;
}
assert(reader_future.valid());
reader = reader_future.get();
if (!reader)
break;
/// Even if task is finished the thread may be not freed in pool.
/// So wait until it will be freed before scheduling a new task.
internal_source->create_reader_pool.wait();
reader_future = internal_source->createReaderAsync();
}
return {};
}
void StorageS3QueueSource::applyActionAfterProcessing(const String & file_path)
{
switch (action)
{
case S3QueueAction::DELETE:
deleteProcessedObject(file_path);
break;
case S3QueueAction::KEEP:
break;
}
}
void StorageS3QueueSource::deleteProcessedObject(const String & file_path)
{
LOG_INFO(log, "Delete processed file {} from bucket {}", file_path, bucket);
S3::DeleteObjectRequest request;
request.WithKey(file_path).WithBucket(bucket);
auto outcome = client->DeleteObject(request);
if (!outcome.IsSuccess())
{
const auto & err = outcome.GetError();
LOG_ERROR(log, "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
}
else
{
LOG_TRACE(log, "Object with path {} was removed from S3", file_path);
}
}
}
#endif

View File

@ -0,0 +1,124 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
# include <Core/Types.h>
# include <Compression/CompressionInfo.h>
# include <Storages/IStorage.h>
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageS3Settings.h>
# include <IO/CompressionMethod.h>
# include <IO/S3/getObjectInfo.h>
# include <Interpreters/Context.h>
# include <Interpreters/threadPoolCallbackRunner.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/ISource.h>
# include <Storages/Cache/SchemaCache.h>
# include <Storages/StorageConfiguration.h>
# include <Poco/URI.h>
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Common/logger_useful.h>
namespace DB
{
class StorageS3QueueSource : public ISource, WithContext
{
public:
using IIterator = StorageS3Source::IIterator;
using DisclosedGlobIterator = StorageS3Source::DisclosedGlobIterator;
using KeysWithInfo = StorageS3Source::KeysWithInfo;
using KeyWithInfo = StorageS3Source::KeyWithInfo;
class QueueGlobIterator : public IIterator
{
public:
QueueGlobIterator(
const S3::Client & client_,
const S3::URI & globbed_uri_,
ASTPtr query,
const Block & virtual_header,
ContextPtr context,
UInt64 & max_poll_size_,
const S3Settings::RequestSettings & request_settings_ = {});
KeyWithInfo next() override;
Strings
filterProcessingFiles(const S3QueueMode & engine_mode, std::unordered_set<String> & exclude_keys, const String & max_file = "");
private:
UInt64 max_poll_size;
KeysWithInfo keys_buf;
KeysWithInfo processing_keys;
mutable std::mutex mutex;
std::unique_ptr<DisclosedGlobIterator> glob_iterator;
std::vector<KeyWithInfo>::iterator processing_iterator;
Poco::Logger * log = &Poco::Logger::get("StorageS3QueueSourceIterator");
};
static Block getHeader(Block sample_block, const std::vector<NameAndTypePair> & requested_virtual_columns);
StorageS3QueueSource(
const std::vector<NameAndTypePair> & requested_virtual_columns_,
const String & format,
String name_,
const Block & sample_block,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
const S3Settings::RequestSettings & request_settings_,
String compression_hint_,
const std::shared_ptr<const S3::Client> & client_,
const String & bucket,
const String & version_id,
std::shared_ptr<IIterator> file_iterator_,
std::shared_ptr<S3QueueFilesMetadata> files_metadata_,
const S3QueueAction & action_,
size_t download_thread_num);
~StorageS3QueueSource() override;
String getName() const override;
Chunk generate() override;
private:
String name;
String bucket;
String version_id;
String format;
ColumnsDescription columns_desc;
S3Settings::RequestSettings request_settings;
std::shared_ptr<const S3::Client> client;
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
using ReaderHolder = StorageS3Source::ReaderHolder;
ReaderHolder reader;
std::vector<NameAndTypePair> requested_virtual_columns;
std::shared_ptr<IIterator> file_iterator;
const S3QueueAction action;
Poco::Logger * log = &Poco::Logger::get("StorageS3QueueSource");
std::future<ReaderHolder> reader_future;
mutable std::mutex mutex;
std::shared_ptr<StorageS3Source> internal_source;
void deleteProcessedObject(const String & file_path);
void applyActionAfterProcessing(const String & file_path);
};
}
#endif

View File

@ -0,0 +1,115 @@
#include <config.h>
#if USE_AWS_S3
# include <Poco/JSON/JSON.h>
# include <Poco/JSON/Object.h>
# include <Poco/JSON/Parser.h>
# include <Storages/S3Queue/S3QueueSettings.h>
# include <Storages/S3Queue/S3QueueTableMetadata.h>
# include <Storages/StorageS3.h>
namespace DB
{
namespace ErrorCodes
{
extern const int METADATA_MISMATCH;
}
S3QueueTableMetadata::S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings)
{
format_name = configuration.format;
after_processing = engine_settings.after_processing.toString();
mode = engine_settings.mode.toString();
s3queue_tracked_files_limit = engine_settings.s3queue_tracked_files_limit;
s3queue_tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec;
}
String S3QueueTableMetadata::toString() const
{
Poco::JSON::Object json;
json.set("after_processing", after_processing);
json.set("mode", mode);
json.set("s3queue_tracked_files_limit", s3queue_tracked_files_limit);
json.set("s3queue_tracked_file_ttl_sec", s3queue_tracked_file_ttl_sec);
json.set("format_name", format_name);
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(json, oss);
return oss.str();
}
void S3QueueTableMetadata::read(const String & metadata_str)
{
Poco::JSON::Parser parser;
auto json = parser.parse(metadata_str).extract<Poco::JSON::Object::Ptr>();
after_processing = json->getValue<String>("after_processing");
mode = json->getValue<String>("mode");
s3queue_tracked_files_limit = json->getValue<UInt64>("s3queue_tracked_files_limit");
s3queue_tracked_file_ttl_sec = json->getValue<UInt64>("s3queue_tracked_file_ttl_sec");
format_name = json->getValue<String>("format_name");
}
S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str)
{
S3QueueTableMetadata metadata;
metadata.read(metadata_str);
return metadata;
}
void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata & from_zk) const
{
if (after_processing != from_zk.after_processing)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs "
"in action after processing. Stored in ZooKeeper: {}, local: {}",
DB::toString(from_zk.after_processing),
DB::toString(after_processing));
if (mode != from_zk.mode)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in engine mode. "
"Stored in ZooKeeper: {}, local: {}",
DB::toString(from_zk.after_processing),
DB::toString(after_processing));
if (s3queue_tracked_files_limit != from_zk.s3queue_tracked_files_limit)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set size. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.s3queue_tracked_files_limit,
s3queue_tracked_files_limit);
if (s3queue_tracked_file_ttl_sec != from_zk.s3queue_tracked_file_ttl_sec)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set age. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.s3queue_tracked_file_ttl_sec,
s3queue_tracked_file_ttl_sec);
if (format_name != from_zk.format_name)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in format name. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.format_name,
format_name);
}
void S3QueueTableMetadata::checkEquals(const S3QueueTableMetadata & from_zk) const
{
checkImmutableFieldsEquals(from_zk);
}
}
#endif

View File

@ -0,0 +1,43 @@
#pragma once
#if USE_AWS_S3
# include <Storages/S3Queue/S3QueueSettings.h>
# include <Storages/StorageS3.h>
# include <base/types.h>
namespace DB
{
class WriteBuffer;
class ReadBuffer;
/** The basic parameters of S3Queue table engine for saving in ZooKeeper.
* Lets you verify that they match local ones.
*/
struct S3QueueTableMetadata
{
String format_name;
String after_processing;
String mode;
UInt64 s3queue_tracked_files_limit;
UInt64 s3queue_tracked_file_ttl_sec;
S3QueueTableMetadata() = default;
S3QueueTableMetadata(const StorageS3::Configuration & configuration, const S3QueueSettings & engine_settings);
void read(const String & metadata_str);
static S3QueueTableMetadata parse(const String & metadata_str);
String toString() const;
void checkEquals(const S3QueueTableMetadata & from_zk) const;
private:
void checkImmutableFieldsEquals(const S3QueueTableMetadata & from_zk) const;
};
}
#endif

View File

@ -0,0 +1,711 @@
#include "config.h"
#if USE_AWS_S3
# include <Databases/DatabaseReplicated.h>
# include <IO/WriteBuffer.h>
# include <IO/WriteHelpers.h>
# include <Interpreters/InterpreterInsertQuery.h>
# include <Processors/Executors/CompletedPipelineExecutor.h>
# include <Common/ProfileEvents.h>
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Common/isValidUTF8.h>
# include "IO/ParallelReadBuffer.h"
# include <Functions/FunctionsConversion.h>
# include <IO/S3Common.h>
# include <Interpreters/TreeRewriter.h>
# include <Parsers/ASTFunction.h>
# include <Parsers/ASTInsertQuery.h>
# include <Storages/NamedCollectionsHelpers.h>
# include <Storages/PartitionedSink.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/S3Queue/S3QueueTableMetadata.h>
# include <Storages/S3Queue/StorageS3Queue.h>
# include <Storages/StorageFactory.h>
# include <Storages/StorageMaterializedView.h>
# include <Storages/StorageS3.h>
# include <Storages/StorageSnapshot.h>
# include <Storages/VirtualColumnUtils.h>
# include <Storages/getVirtualsForStorage.h>
# include <Common/NamedCollections/NamedCollections.h>
# include <Formats/FormatFactory.h>
# include <Processors/Formats/IInputFormat.h>
# include <Processors/Formats/IOutputFormat.h>
# include <Processors/Transforms/AddingDefaultsTransform.h>
# include <QueryPipeline/QueryPipelineBuilder.h>
# include <DataTypes/DataTypeString.h>
# include <Common/parseGlobs.h>
# include <filesystem>
# include <Processors/ISource.h>
# include <Processors/Sinks/SinkToStorage.h>
# include <QueryPipeline/Pipe.h>
namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event S3DeleteObjects;
extern const Event S3ListObjects;
}
namespace DB
{
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
static const auto MAX_THREAD_WORK_DURATION_MS = 60000;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int S3_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int QUERY_NOT_ALLOWED;
extern const int NO_ZOOKEEPER;
extern const int REPLICA_ALREADY_EXISTS;
extern const int INCOMPATIBLE_COLUMNS;
}
StorageS3Queue::StorageS3Queue(
std::unique_ptr<S3QueueSettings> s3queue_settings_,
const StorageS3::Configuration & configuration_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
ASTPtr partition_by_)
: IStorage(table_id_)
, WithContext(context_)
, s3queue_settings(std::move(s3queue_settings_))
, after_processing(s3queue_settings->after_processing)
, configuration{configuration_}
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
, format_settings(format_settings_)
, partition_by(partition_by_)
, log(&Poco::Logger::get("StorageS3Queue (" + table_id_.table_name + ")"))
{
if (configuration.url.key.ends_with('/'))
configuration.url.key += '*';
if (!withGlobs())
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
String setting_zk_path = s3queue_settings->keeper_path;
if (setting_zk_path.empty())
{
auto database = DatabaseCatalog::instance().getDatabase(table_id_.database_name);
bool is_in_replicated_database = database->getEngineName() == "Replicated";
auto default_path = getContext()->getSettingsRef().s3queue_default_zookeeper_path.value;
String zk_path_prefix;
if (!default_path.empty())
{
zk_path_prefix = default_path;
}
else if (is_in_replicated_database)
{
LOG_INFO(log, "S3Queue engine zookeeper path is not specified. "
"Using replicated database zookeeper path");
zk_path_prefix = fs::path(assert_cast<const DatabaseReplicated *>(database.get())->getZooKeeperPath()) / "s3queue";
}
else
{
throw Exception(ErrorCodes::NO_ZOOKEEPER,
"S3Queue keeper_path engine setting not specified, "
"s3queue_default_zookeeper_path_prefix not specified");
}
zk_path = zkutil::extractZooKeeperPath(
fs::path(zk_path_prefix) / toString(table_id_.uuid), /* check_starts_with_slash */ true, log);
}
else
{
/// We do not add table uuid here on purpose.
zk_path = zkutil::extractZooKeeperPath(s3queue_settings->keeper_path.value, /* check_starts_with_slash */ true, log);
}
LOG_INFO(log, "Using zookeeper path: {}", zk_path);
FormatFactory::instance().checkFormatName(configuration.format);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.url.uri);
StorageInMemoryMetadata storage_metadata;
configuration.update(context_);
if (columns_.empty())
{
auto columns = StorageS3::getTableStructureFromDataImpl(configuration, format_settings, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
auto metadata_snapshot = getInMemoryMetadataPtr();
const bool is_first_replica = createTableIfNotExists(metadata_snapshot);
if (!is_first_replica)
{
checkTableStructure(zk_path, metadata_snapshot);
}
files_metadata = std::make_shared<S3QueueFilesMetadata>(this, *s3queue_settings);
auto default_virtuals = NamesAndTypesList{
{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
auto columns = storage_metadata.getSampleBlock().getNamesAndTypesList();
virtual_columns = getVirtualsForStorage(columns, default_virtuals);
for (const auto & column : virtual_columns)
virtual_block.insert({column.type->createColumn(), column.type, column.name});
auto poll_thread = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
task = std::make_shared<TaskContext>(std::move(poll_thread));
}
bool StorageS3Queue::supportsSubcolumns() const
{
return FormatFactory::instance().checkIfFormatSupportsSubcolumns(configuration.format);
}
bool StorageS3Queue::supportsSubsetOfColumns() const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration.format);
}
Pipe StorageS3Queue::read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr local_context,
QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size,
size_t /* num_streams */)
{
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(
ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");
if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageS3Queue with attached materialized views");
auto query_configuration = updateConfigurationAndGetCopy(local_context);
Pipes pipes;
std::unordered_set<String> column_names_set(column_names.begin(), column_names.end());
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
{
if (column_names_set.contains(virtual_column.name))
requested_virtual_columns.push_back(virtual_column);
}
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(local_context, query_info.query);
ColumnsDescription columns_description;
Block block_for_format;
if (supportsSubsetOfColumns())
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();
std::erase_if(
fetch_columns,
[&](const String & col)
{
return std::any_of(
virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
});
if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
const size_t max_download_threads = local_context->getSettingsRef().max_download_threads;
return Pipe(std::make_shared<StorageS3QueueSource>(
requested_virtual_columns,
configuration.format,
getName(),
block_for_format,
local_context,
format_settings,
columns_description,
max_block_size,
query_configuration.request_settings,
configuration.compression_method,
query_configuration.client,
query_configuration.url.bucket,
query_configuration.url.version_id,
iterator_wrapper,
files_metadata,
after_processing,
max_download_threads));
}
SinkToStoragePtr StorageS3Queue::write(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, bool)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Write is not supported by storage {}", getName());
}
void StorageS3Queue::truncate(const ASTPtr &, const StorageMetadataPtr &, ContextPtr, TableExclusiveLockHolder &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate is not supported by storage {}", getName());
}
NamesAndTypesList StorageS3Queue::getVirtuals() const
{
return virtual_columns;
}
bool StorageS3Queue::supportsPartitionBy() const
{
return true;
}
void StorageS3Queue::startup()
{
if (task)
task->holder->activateAndSchedule();
}
void StorageS3Queue::shutdown()
{
shutdown_called = true;
if (task)
{
task->stream_cancelled = true;
task->holder->deactivate();
}
}
size_t StorageS3Queue::getTableDependentCount() const
{
auto table_id = getStorageID();
// Check if at least one direct dependency is attached
return DatabaseCatalog::instance().getDependentViews(table_id).size();
}
bool StorageS3Queue::hasDependencies(const StorageID & table_id)
{
// Check if all dependencies are attached
auto view_ids = DatabaseCatalog::instance().getDependentViews(table_id);
LOG_TEST(log, "Number of attached views {} for {}", view_ids.size(), table_id.getNameForLogs());
if (view_ids.empty())
return false;
// Check the dependencies are ready?
for (const auto & view_id : view_ids)
{
auto view = DatabaseCatalog::instance().tryGetTable(view_id, getContext());
if (!view)
return false;
// If it materialized view, check it's target table
auto * materialized_view = dynamic_cast<StorageMaterializedView *>(view.get());
if (materialized_view && !materialized_view->tryGetTargetTable())
return false;
}
return true;
}
void StorageS3Queue::threadFunc()
{
bool reschedule = true;
try
{
auto table_id = getStorageID();
auto dependencies_count = getTableDependentCount();
if (dependencies_count)
{
auto start_time = std::chrono::steady_clock::now();
mv_attached.store(true);
// Keep streaming as long as there are attached views and streaming is not cancelled
while (!task->stream_cancelled)
{
if (!hasDependencies(table_id))
{
/// For this case, we can not wait for watch thread to wake up
reschedule = true;
break;
}
LOG_DEBUG(log, "Started streaming to {} attached views", dependencies_count);
streamToViews();
auto ts = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts - start_time);
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
{
LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
reschedule = true;
break;
}
reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms;
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
mv_attached.store(false);
if (reschedule && !shutdown_called)
{
LOG_TRACE(log, "Reschedule S3 Queue thread func.");
/// Reschedule with backoff.
if (reschedule_processing_interval_ms < s3queue_settings->s3queue_polling_max_timeout_ms)
reschedule_processing_interval_ms += s3queue_settings->s3queue_polling_backoff_ms;
task->holder->scheduleAfter(reschedule_processing_interval_ms);
}
}
void StorageS3Queue::streamToViews()
{
auto table_id = getStorageID();
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (!table)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Engine table {} doesn't exist.", table_id.getNameForLogs());
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id;
size_t block_size = 100;
auto s3queue_context = Context::createCopy(getContext());
s3queue_context->makeQueryContext();
auto query_configuration = updateConfigurationAndGetCopy(s3queue_context);
// Create a stream for each consumer and join them in a union stream
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true);
auto block_io = interpreter.execute();
auto column_names = block_io.pipeline.getHeader().getNames();
// Create a stream for each consumer and join them in a union stream
std::vector<NameAndTypePair> requested_virtual_columns;
for (const auto & virtual_column : getVirtuals())
{
requested_virtual_columns.push_back(virtual_column);
}
std::shared_ptr<StorageS3Source::IIterator> iterator_wrapper = createFileIterator(s3queue_context, nullptr);
ColumnsDescription columns_description;
Block block_for_format;
if (supportsSubsetOfColumns())
{
auto fetch_columns = column_names;
const auto & virtuals = getVirtuals();
std::erase_if(
fetch_columns,
[&](const String & col)
{
return std::any_of(
virtuals.begin(), virtuals.end(), [&](const NameAndTypePair & virtual_col) { return col == virtual_col.name; });
});
if (fetch_columns.empty())
fetch_columns.push_back(ExpressionActions::getSmallestColumn(storage_snapshot->metadata->getColumns().getAllPhysical()).name);
columns_description = storage_snapshot->getDescriptionForColumns(fetch_columns);
block_for_format = storage_snapshot->getSampleBlockForColumns(columns_description.getNamesOfPhysical());
}
else
{
columns_description = storage_snapshot->metadata->getColumns();
block_for_format = storage_snapshot->metadata->getSampleBlock();
}
const size_t max_download_threads = s3queue_context->getSettingsRef().max_download_threads;
Pipes pipes;
auto pipe = Pipe(std::make_shared<StorageS3QueueSource>(
requested_virtual_columns,
configuration.format,
getName(),
block_for_format,
s3queue_context,
format_settings,
columns_description,
block_size,
query_configuration.request_settings,
configuration.compression_method,
query_configuration.client,
query_configuration.url.bucket,
query_configuration.url.version_id,
iterator_wrapper,
files_metadata,
after_processing,
max_download_threads));
std::atomic_size_t rows = 0;
{
block_io.pipeline.complete(std::move(pipe));
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();
}
}
StorageS3Queue::Configuration StorageS3Queue::updateConfigurationAndGetCopy(ContextPtr local_context)
{
configuration.update(local_context);
return configuration;
}
zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const
{
std::lock_guard lock{zk_mutex};
if (!zk_client || zk_client->expired())
{
zk_client = getContext()->getZooKeeper();
zk_client->sync(zk_path);
}
return zk_client;
}
bool StorageS3Queue::createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot)
{
auto zookeeper = getZooKeeper();
zookeeper->createAncestors(zk_path);
for (size_t i = 0; i < zk_create_table_retries; ++i)
{
Coordination::Requests ops;
bool is_first_replica = true;
if (zookeeper->exists(zk_path + "/metadata"))
{
if (!zookeeper->exists(zk_path + "/processing"))
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
LOG_DEBUG(log, "This table {} is already created, will use existing metadata for checking engine settings", zk_path);
is_first_replica = false;
}
else
{
String metadata_str = S3QueueTableMetadata(configuration, *s3queue_settings).toString();
ops.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/failed", "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/processing", "", zkutil::CreateMode::Ephemeral));
ops.emplace_back(zkutil::makeCreateRequest(
zk_path + "/columns", metadata_snapshot->getColumns().toString(), zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeCreateRequest(zk_path + "/metadata", metadata_str, zkutil::CreateMode::Persistent));
}
Coordination::Responses responses;
auto code = zookeeper->tryMulti(ops, responses);
if (code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "It looks like the table {} was created by another server at the same moment, will retry", zk_path);
continue;
}
else if (code != Coordination::Error::ZOK)
{
zkutil::KeeperMultiException::check(code, ops, responses);
}
return is_first_replica;
}
throw Exception(
ErrorCodes::REPLICA_ALREADY_EXISTS,
"Cannot create table, because it is created concurrently every time or because "
"of wrong zk_path or because of logical error");
}
/** Verify that list of columns and table settings match those specified in ZK (/metadata).
* If not, throw an exception.
*/
void StorageS3Queue::checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot)
{
auto zookeeper = getZooKeeper();
S3QueueTableMetadata old_metadata(configuration, *s3queue_settings);
Coordination::Stat metadata_stat;
String metadata_str = zookeeper->get(fs::path(zookeeper_prefix) / "metadata", &metadata_stat);
auto metadata_from_zk = S3QueueTableMetadata::parse(metadata_str);
old_metadata.checkEquals(metadata_from_zk);
Coordination::Stat columns_stat;
auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(fs::path(zookeeper_prefix) / "columns", &columns_stat));
const ColumnsDescription & old_columns = metadata_snapshot->getColumns();
if (columns_from_zk != old_columns)
{
throw Exception(
ErrorCodes::INCOMPATIBLE_COLUMNS,
"Table columns structure in ZooKeeper is different from local table structure. Local columns:\n"
"{}\nZookeeper columns:\n{}",
old_columns.toString(),
columns_from_zk.toString());
}
}
std::shared_ptr<StorageS3QueueSource::IIterator>
StorageS3Queue::createFileIterator(ContextPtr local_context, ASTPtr query)
{
auto it = std::make_shared<StorageS3QueueSource::QueueGlobIterator>(
*configuration.client,
configuration.url,
query,
virtual_block,
local_context,
s3queue_settings->s3queue_polling_size.value,
configuration.request_settings);
auto zookeeper = getZooKeeper();
auto lock = files_metadata->acquireLock(zookeeper);
S3QueueFilesMetadata::S3FilesCollection files_to_skip = files_metadata->getProcessedFailedAndProcessingFiles();
Strings files_to_process;
if (s3queue_settings->mode == S3QueueMode::UNORDERED)
{
files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip);
}
else
{
String max_processed_file = files_metadata->getMaxProcessedFile();
files_to_process = it->filterProcessingFiles(s3queue_settings->mode, files_to_skip, max_processed_file);
}
LOG_TEST(log, "Found files to process: {}", fmt::join(files_to_process, ", "));
files_metadata->setFilesProcessing(files_to_process);
return it;
}
void StorageS3Queue::drop()
{
auto zookeeper = getZooKeeper();
if (zookeeper->exists(zk_path))
zookeeper->removeRecursive(zk_path);
}
void registerStorageS3QueueImpl(const String & name, StorageFactory & factory)
{
factory.registerStorage(
name,
[](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = StorageS3::getConfiguration(engine_args, args.getLocalContext());
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
std::optional<FormatSettings> format_settings;
auto s3queue_settings = std::make_unique<S3QueueSettings>();
if (args.storage_def->settings)
{
s3queue_settings->loadFromQuery(*args.storage_def);
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
user_format_settings.set(change.name, change.value);
else
LOG_TRACE(&Poco::Logger::get("StorageS3"), "Remove: {}", change.name);
args.storage_def->settings->changes.removeSetting(change.name);
}
for (const auto & change : args.storage_def->settings->changes)
{
if (user_format_settings.has(change.name))
user_format_settings.applyChange(change);
}
format_settings = getFormatSettings(args.getContext(), user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
ASTPtr partition_by;
if (args.storage_def->partition_by)
partition_by = args.storage_def->partition_by->clone();
return std::make_shared<StorageS3Queue>(
std::move(s3queue_settings),
std::move(configuration),
args.table_id,
args.columns,
args.constraints,
args.comment,
args.getContext(),
format_settings,
partition_by);
},
{
.supports_settings = true,
.supports_sort_order = true, // for partition by
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
void registerStorageS3Queue(StorageFactory & factory)
{
return registerStorageS3QueueImpl("S3Queue", factory);
}
}
#endif

View File

@ -0,0 +1,146 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
# include <Core/Types.h>
# include <Compression/CompressionInfo.h>
# include <Common/ZooKeeper/ZooKeeper.h>
# include <Core/BackgroundSchedulePool.h>
# include <Storages/IStorage.h>
# include <Storages/S3Queue/S3QueueFilesMetadata.h>
# include <Storages/S3Queue/S3QueueSettings.h>
# include <Storages/S3Queue/S3QueueSource.h>
# include <Storages/StorageS3Settings.h>
# include <IO/CompressionMethod.h>
# include <IO/S3/getObjectInfo.h>
# include <Interpreters/Context.h>
# include <Interpreters/threadPoolCallbackRunner.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/ISource.h>
# include <Storages/Cache/SchemaCache.h>
# include <Storages/StorageConfiguration.h>
# include <Storages/StorageS3.h>
# include <Poco/URI.h>
# include <Common/logger_useful.h>
namespace Aws::S3
{
class Client;
}
namespace DB
{
class StorageS3Queue : public IStorage, WithContext
{
public:
using Configuration = typename StorageS3::Configuration;
StorageS3Queue(
std::unique_ptr<S3QueueSettings> s3queue_settings_,
const Configuration & configuration_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const String & comment,
ContextPtr context_,
std::optional<FormatSettings> format_settings_,
ASTPtr partition_by_ = nullptr);
String getName() const override { return "S3Queue"; }
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
SinkToStoragePtr write(
const ASTPtr & query,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
bool async_insert) override;
void truncate(
const ASTPtr & /*query*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
ContextPtr /*local_context*/,
TableExclusiveLockHolder &) override;
NamesAndTypesList getVirtuals() const override;
bool supportsPartitionBy() const override;
const auto & getFormatName() const { return configuration.format; }
const String & getZooKeeperPath() const { return zk_path; }
zkutil::ZooKeeperPtr getZooKeeper() const;
private:
const std::unique_ptr<S3QueueSettings> s3queue_settings;
const S3QueueAction after_processing;
std::shared_ptr<S3QueueFilesMetadata> files_metadata;
Configuration configuration;
NamesAndTypesList virtual_columns;
Block virtual_block;
UInt64 reschedule_processing_interval_ms;
std::optional<FormatSettings> format_settings;
ASTPtr partition_by;
String zk_path;
mutable zkutil::ZooKeeperPtr zk_client;
mutable std::mutex zk_mutex;
std::atomic<bool> mv_attached = false;
std::atomic<bool> shutdown_called{false};
Poco::Logger * log;
bool supportsSubcolumns() const override;
bool withGlobs() const { return configuration.url.key.find_first_of("*?{") != std::string::npos; }
void threadFunc();
size_t getTableDependentCount() const;
bool hasDependencies(const StorageID & table_id);
void startup() override;
void shutdown() override;
void drop() override;
struct TaskContext
{
BackgroundSchedulePool::TaskHolder holder;
std::atomic<bool> stream_cancelled{false};
explicit TaskContext(BackgroundSchedulePool::TaskHolder && task_) : holder(std::move(task_)) { }
};
std::shared_ptr<TaskContext> task;
bool supportsSubsetOfColumns() const override;
const UInt32 zk_create_table_retries = 1000;
bool createTableIfNotExists(const StorageMetadataPtr & metadata_snapshot);
void checkTableStructure(const String & zookeeper_prefix, const StorageMetadataPtr & metadata_snapshot);
using KeysWithInfo = StorageS3QueueSource::KeysWithInfo;
std::shared_ptr<StorageS3QueueSource::IIterator>
createFileIterator(ContextPtr local_context, ASTPtr query);
void streamToViews();
Configuration updateConfigurationAndGetCopy(ContextPtr local_context);
};
}
#endif

View File

@ -596,7 +596,7 @@ StorageS3Source::ReaderHolder StorageS3Source::createReader()
auto pipeline = std::make_unique<QueryPipeline>(QueryPipelineBuilder::getPipeline(std::move(builder)));
auto current_reader = std::make_unique<PullingPipelineExecutor>(*pipeline);
return ReaderHolder{fs::path(bucket) / key_with_info.key, std::move(read_buf), std::move(input_format), std::move(pipeline), std::move(current_reader)};
return ReaderHolder{key_with_info.key, bucket, std::move(read_buf), std::move(input_format), std::move(pipeline), std::move(current_reader)};
}
std::future<StorageS3Source::ReaderHolder> StorageS3Source::createReaderAsync()

View File

@ -141,6 +141,8 @@ public:
Chunk generate() override;
private:
friend class StorageS3QueueSource;
String name;
String bucket;
String version_id;
@ -157,12 +159,14 @@ private:
{
public:
ReaderHolder(
String path_,
String key_,
String bucket_,
std::unique_ptr<ReadBuffer> read_buf_,
std::shared_ptr<IInputFormat> input_format_,
std::unique_ptr<QueryPipeline> pipeline_,
std::unique_ptr<PullingPipelineExecutor> reader_)
: path(std::move(path_))
: key(std::move(key_))
, bucket(std::move(bucket_))
, read_buf(std::move(read_buf_))
, input_format(std::move(input_format_))
, pipeline(std::move(pipeline_))
@ -187,19 +191,22 @@ private:
pipeline = std::move(other.pipeline);
input_format = std::move(other.input_format);
read_buf = std::move(other.read_buf);
path = std::move(other.path);
key = std::move(other.key);
bucket = std::move(other.bucket);
return *this;
}
explicit operator bool() const { return reader != nullptr; }
PullingPipelineExecutor * operator->() { return reader.get(); }
const PullingPipelineExecutor * operator->() const { return reader.get(); }
const String & getPath() const { return path; }
String getPath() const { return fs::path(bucket) / key; }
const String & getFile() const { return key; }
const IInputFormat * getInputFormat() const { return input_format.get(); }
private:
String path;
String key;
String bucket;
std::unique_ptr<ReadBuffer> read_buf;
std::shared_ptr<IInputFormat> input_format;
std::unique_ptr<QueryPipeline> pipeline;
@ -323,6 +330,7 @@ protected:
private:
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
friend class StorageS3Queue;
Configuration configuration;
std::mutex configuration_update_mutex;

View File

@ -34,6 +34,8 @@ void registerStorageS3(StorageFactory & factory);
void registerStorageCOS(StorageFactory & factory);
void registerStorageOSS(StorageFactory & factory);
void registerStorageHudi(StorageFactory & factory);
void registerStorageS3Queue(StorageFactory & factory);
#if USE_PARQUET
void registerStorageDeltaLake(StorageFactory & factory);
#endif
@ -133,6 +135,7 @@ void registerStorages()
registerStorageCOS(factory);
registerStorageOSS(factory);
registerStorageHudi(factory);
registerStorageS3Queue(factory);
#if USE_PARQUET
registerStorageDeltaLake(factory);

View File

@ -0,0 +1,11 @@
<clickhouse>
<s3>
<s3_mock>
<endpoint>http://resolver:8080</endpoint>
<header>Authorization: Bearer TOKEN</header>
</s3_mock>
<s3_mock_restricted_directory>
<endpoint>http://resolver:8080/root-with-auth/restricteddirectory/</endpoint>
</s3_mock_restricted_directory>
</s3>
</clickhouse>

View File

@ -0,0 +1,43 @@
<clickhouse>
<named_collections>
<s3_conf1>
<url>http://minio1:9001/root/test_table</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_conf1>
<s3_parquet>
<url>http://minio1:9001/root/test_parquet</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet>
<s3_parquet_gz>
<url>http://minio1:9001/root/test_parquet_gz</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet_gz>
<s3_orc>
<url>http://minio1:9001/root/test_orc</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_orc>
<s3_native>
<url>http://minio1:9001/root/test_native</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_native>
<s3_arrow>
<url>http://minio1:9001/root/test.arrow</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_arrow>
<s3_parquet2>
<url>http://minio1:9001/root/test.parquet</url>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
</s3_parquet2>
<s3_json_no_sign>
<url>http://minio1:9001/root/test_cache4.jsonl</url>
<no_sign_request>true</no_sign_request>
</s3_json_no_sign>
</named_collections>
</clickhouse>

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<stream_like_engine_allow_direct_select>1</stream_like_engine_allow_direct_select>
</default>
</profiles>
</clickhouse>

View File

@ -0,0 +1,873 @@
import io
import logging
import os
import random
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
import json
"""
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-server
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-client
export CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH=/home/sergey/vkr/ClickHouse/build/programs/clickhouse-odbc-bridge
export CLICKHOUSE_TESTS_BASE_CONFIG_DIR=/home/sergey/vkr/ClickHouse/programs/server
"""
def prepare_s3_bucket(started_cluster):
# Allows read-write access for bucket without authorization.
bucket_read_write_policy = {
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetBucketLocation",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::root",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:GetObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::root/*",
},
{
"Sid": "",
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "s3:DeleteObject",
"Resource": "arn:aws:s3:::root/*",
},
],
}
minio_client = started_cluster.minio_client
minio_client.set_bucket_policy(
started_cluster.minio_bucket, json.dumps(bucket_read_write_policy)
)
started_cluster.minio_restricted_bucket = "{}-with-auth".format(
started_cluster.minio_bucket
)
if minio_client.bucket_exists(started_cluster.minio_restricted_bucket):
minio_client.remove_bucket(started_cluster.minio_restricted_bucket)
minio_client.make_bucket(started_cluster.minio_restricted_bucket)
@pytest.fixture(autouse=True)
def s3_queue_setup_teardown(started_cluster):
instance = started_cluster.instances["instance"]
instance_2 = started_cluster.instances["instance2"]
instance.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
instance_2.query("DROP DATABASE IF EXISTS test; CREATE DATABASE test;")
minio = started_cluster.minio_client
objects = list(
minio.list_objects(started_cluster.minio_restricted_bucket, recursive=True)
)
for obj in objects:
minio.remove_object(started_cluster.minio_restricted_bucket, obj.object_name)
yield # run test
MINIO_INTERNAL_PORT = 9001
AVAILABLE_MODES = ["unordered", "ordered"]
AUTH = "'minio','minio123',"
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
def put_s3_file_content(started_cluster, bucket, filename, data):
buf = io.BytesIO(data)
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
def generate_random_files(
count, prefix, cluster, bucket, column_num=3, row_num=10, start_ind=0
):
total_values = []
to_generate = [
(f"{prefix}/test_{i}.csv", i) for i in range(start_ind, start_ind + count)
]
to_generate.sort(key=lambda x: x[0])
for filename, i in to_generate:
rand_values = [
[random.randint(0, 50) for _ in range(column_num)] for _ in range(row_num)
]
total_values += rand_values
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
put_s3_file_content(cluster, bucket, filename, values_csv)
return total_values
# Returns content of given S3 file as string.
def get_s3_file_content(started_cluster, bucket, filename, decode=True):
# type: (ClickHouseCluster, str, str, bool) -> str
data = started_cluster.minio_client.get_object(bucket, filename)
data_str = b""
for chunk in data.stream():
data_str += chunk
if decode:
return data_str.decode()
return data_str
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"instance",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"],
)
cluster.add_instance(
"instance2",
user_configs=["configs/users.xml"],
with_minio=True,
with_zookeeper=True,
main_configs=["configs/defaultS3.xml", "configs/named_collections.xml"],
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
prepare_s3_bucket(cluster)
yield cluster
finally:
cluster.shutdown()
def run_query(instance, query, stdin=None, settings=None):
# type: (ClickHouseInstance, str, object, dict) -> str
logging.info("Running query '{}'...".format(query))
result = instance.query(query, stdin=stdin, settings=settings)
logging.info("Query finished")
return result
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_delete_after_processing(started_cluster, mode):
prefix = "delete"
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
total_values = generate_random_files(5, prefix, started_cluster, bucket)
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_delete_{mode}',
s3queue_loading_retries = 3,
after_processing='delete';
"""
)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == total_values
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_failed_retry(started_cluster, mode):
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = [
["failed", 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
filename = f"test.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_failed_retry_{mode}',
s3queue_loading_retries = 3;
"""
)
# first try
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# second try
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# upload correct file
values = [
[1, 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
put_s3_file_content(started_cluster, bucket, filename, values_csv)
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == values
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_file(started_cluster, mode):
auth = "'minio','minio123',"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
values = [
[12549, 2463, 19893],
[64021, 38652, 66703],
[81611, 39650, 83516],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
filename = f"test.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
instance.query(
"""
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.s3_queue_2;
DROP TABLE IF EXISTS test.s3_queue_3;
"""
)
instance.query(
f"""
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_{mode}'
"""
)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == values
instance.query(
f"""
CREATE TABLE test.s3_queue_2 ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_{mode}'
"""
)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# New table with same zookeeper path
get_query = f"SELECT * FROM test.s3_queue_2"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == []
# New table with different zookeeper path
instance.query(
f"""
CREATE TABLE test.s3_queue_3 ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/*', {auth}'CSV')
SETTINGS
mode = '{mode}',
keeper_path='/clickhouse/select_{mode}_2'
"""
)
get_query = f"SELECT * FROM test.s3_queue_3"
assert [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
] == values
values = [
[1, 1, 1],
]
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
filename = f"t.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
get_query = f"SELECT * FROM test.s3_queue_3"
if mode == "unordered":
assert [
list(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
] == values
elif mode == "ordered":
assert [
list(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
] == []
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_direct_select_multiple_files(started_cluster, mode):
prefix = f"multiple_files_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query("drop table if exists test.s3_queue")
instance.query(
f"""
CREATE TABLE test.s3_queue ({table_format})
ENGINE = S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/select_multiple_{mode}'
"""
)
for i in range(5):
rand_values = [[random.randint(0, 50) for _ in range(3)] for _ in range(10)]
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
filename = f"{prefix}/test_{i}.csv"
put_s3_file_content(started_cluster, bucket, filename, values_csv)
get_query = f"SELECT * FROM test.s3_queue"
assert [
list(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
] == rand_values
total_values = generate_random_files(
4, prefix, started_cluster, bucket, start_ind=5
)
get_query = f"SELECT * FROM test.s3_queue"
assert {
tuple(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
} == set([tuple(i) for i in total_values])
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_view_(started_cluster, mode):
prefix = f"streaming_files_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
total_values = generate_random_files(10, prefix, started_cluster, bucket)
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/view_{mode}';
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
"""
)
expected_values = set([tuple(i) for i in total_values])
for i in range(10):
get_query = f"SELECT * FROM test.persistent_s3_queue_mv"
selected_values = {
tuple(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
}
if selected_values != expected_values:
time.sleep(1)
else:
break
assert selected_values == expected_values
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_streaming_to_many_views(started_cluster, mode):
prefix = f"streaming_files_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
retry_cnt = 10
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.s3_queue_persistent_2;
DROP TABLE IF EXISTS test.s3_queue_persistent_3;
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_2;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_3;
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_2 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_3 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/multiple_view_{mode}';
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_2 TO test.s3_queue_persistent_2 AS
SELECT
*
FROM test.s3_queue;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_3 TO test.s3_queue_persistent_3 AS
SELECT
*
FROM test.s3_queue;
"""
)
total_values = generate_random_files(5, prefix, started_cluster, bucket)
expected_values = set([tuple(i) for i in total_values])
for i in range(retry_cnt):
retry = False
for get_query in [
f"SELECT * FROM test.s3_queue_persistent",
f"SELECT * FROM test.s3_queue_persistent_2",
f"SELECT * FROM test.s3_queue_persistent_3",
]:
selected_values = {
tuple(map(int, l.split()))
for l in run_query(instance, get_query).splitlines()
}
if i == retry_cnt - 1:
assert selected_values == expected_values
if selected_values != expected_values:
retry = True
break
if retry:
time.sleep(1)
else:
break
def test_multiple_tables_meta_mismatch(started_cluster):
prefix = f"test_meta"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
# check mode
failed = False
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_meta';
"""
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in engine mode" in str(e)
failed = True
assert failed is True
# check columns
table_format_copy = table_format + ", column4 UInt32"
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format_copy})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
except QueryRuntimeException as e:
assert (
"Table columns structure in ZooKeeper is different from local table structure"
in str(e)
)
failed = True
assert failed is True
# check format
try:
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'TSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
except QueryRuntimeException as e:
assert "Existing table metadata in ZooKeeper differs in format name" in str(e)
failed = True
assert failed is True
# create working engine
instance.query(
f"""
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'ordered',
keeper_path = '/clickhouse/test_meta';
"""
)
def test_max_set_age(started_cluster):
files_to_generate = 10
max_age = 1
prefix = f"test_multiple"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_set_age',
s3queue_tracked_files_limit = 10,
s3queue_tracked_file_ttl_sec = {max_age};
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, row_num=1
)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
time.sleep(max_age + 1)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync(started_cluster, mode):
files_to_generate = 300
poll_size = 30
prefix = f"test_multiple_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.s3_queue_copy;
DROP TABLE IF EXISTS test.s3_queue_copy_2;
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.s3_queue_persistent_copy;
DROP TABLE IF EXISTS test.s3_queue_persistent_copy_2;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv_copy_2;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_copy ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_copy_2 ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_sync_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_copy ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE TABLE test.s3_queue_persistent_copy_2 ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy TO test.s3_queue_persistent_copy AS
SELECT
*
FROM test.s3_queue_copy;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv_copy_2 TO test.s3_queue_persistent_copy_2 AS
SELECT
*
FROM test.s3_queue_copy_2;
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, row_num=1
)
def get_count(table_name):
return int(run_query(instance, f"SELECT count() FROM {table_name}"))
for _ in range(100):
if (
get_count("test.s3_queue_persistent")
+ get_count("test.s3_queue_persistent_copy")
+ get_count("test.s3_queue_persistent_copy_2")
) == files_to_generate:
break
time.sleep(1)
get_query = f"SELECT * FROM test.s3_queue_persistent"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
get_query_copy = f"SELECT * FROM test.s3_queue_persistent_copy"
res2 = [
list(map(int, l.split()))
for l in run_query(instance, get_query_copy).splitlines()
]
get_query_copy_2 = f"SELECT * FROM test.s3_queue_persistent_copy_2"
res3 = [
list(map(int, l.split()))
for l in run_query(instance, get_query_copy_2).splitlines()
]
assert {tuple(v) for v in res1 + res2 + res3} == set(
[tuple(i) for i in total_values]
)
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count("test.s3_queue_persistent")
+ get_count("test.s3_queue_persistent_copy")
+ get_count("test.s3_queue_persistent_copy_2")
) == files_to_generate
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_multiple_tables_streaming_sync_distributed(started_cluster, mode):
files_to_generate = 100
poll_size = 10
prefix = f"test_multiple_{mode}"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
instance_2 = started_cluster.instances["instance2"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
for inst in [instance, instance_2]:
inst.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
DROP TABLE IF EXISTS test.s3_queue_persistent;
DROP TABLE IF EXISTS test.persistent_s3_queue_mv;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = '{mode}',
keeper_path = '/clickhouse/test_multiple_consumers_{mode}',
s3queue_polling_size = {poll_size};
CREATE TABLE test.s3_queue_persistent ({table_format})
ENGINE = MergeTree()
ORDER BY column1;
CREATE MATERIALIZED VIEW test.persistent_s3_queue_mv TO test.s3_queue_persistent AS
SELECT
*
FROM test.s3_queue;
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, row_num=1
)
def get_count(node, table_name):
return int(run_query(node, f"SELECT count() FROM {table_name}"))
for _ in range(100):
if (
get_count(instance, "test.s3_queue_persistent")
+ get_count(instance_2, "test.s3_queue_persistent")
) == files_to_generate:
break
time.sleep(1)
get_query = f"SELECT * FROM test.s3_queue_persistent"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
res2 = [
list(map(int, l.split())) for l in run_query(instance_2, get_query).splitlines()
]
# Checking that all engines have made progress
assert len(res1) > 0
assert len(res2) > 0
assert len(res1) + len(res2) == files_to_generate
assert {tuple(v) for v in res1 + res2} == set([tuple(i) for i in total_values])
# Checking that all files were processed only once
time.sleep(10)
assert (
get_count(instance, "test.s3_queue_persistent")
+ get_count(instance_2, "test.s3_queue_persistent")
) == files_to_generate
def test_max_set_size(started_cluster):
files_to_generate = 10
prefix = f"test_multiple"
bucket = started_cluster.minio_restricted_bucket
instance = started_cluster.instances["instance"]
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
instance.query(
f"""
DROP TABLE IF EXISTS test.s3_queue;
CREATE TABLE test.s3_queue ({table_format})
ENGINE=S3Queue('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{prefix}/*', {AUTH}'CSV')
SETTINGS
mode = 'unordered',
keeper_path = '/clickhouse/test_set_size',
s3queue_tracked_files_limit = {files_to_generate - 1};
"""
)
total_values = generate_random_files(
files_to_generate, prefix, started_cluster, bucket, start_ind=0, row_num=1
)
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == total_values
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == [total_values[0]]
get_query = f"SELECT * FROM test.s3_queue"
res1 = [
list(map(int, l.split())) for l in run_query(instance, get_query).splitlines()
]
assert res1 == [total_values[1]]