Merge pull request #65458 from ClickHouse/add-azure-queue-storage

Add AzureQueue storage
This commit is contained in:
Kseniia Sumarokova 2024-06-28 12:39:05 +00:00 committed by GitHub
commit 2673a773c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 739 additions and 626 deletions

View File

@ -28,6 +28,8 @@ CREATE TABLE s3_queue_engine_table (name String, value UInt32)
[s3queue_cleanup_interval_max_ms = 30000,]
```
Starting with `24.7` settings without `s3queue_` prefix are also supported.
**Engine parameters**
- `path` — Bucket url with path to file. Supports following wildcards in readonly mode: `*`, `**`, `?`, `{abc,def}` and `{N..M}` where `N`, `M` — numbers, `'abc'`, `'def'` — strings. For more information see [below](#wildcards-in-path).

View File

@ -222,7 +222,7 @@ add_object_library(clickhouse_storages_mergetree Storages/MergeTree)
add_object_library(clickhouse_storages_statistics Storages/Statistics)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_storages_windowview Storages/WindowView)
add_object_library(clickhouse_storages_s3queue Storages/S3Queue)
add_object_library(clickhouse_storages_s3queue Storages/ObjectStorageQueue)
add_object_library(clickhouse_storages_materializedview Storages/MaterializedView)
add_object_library(clickhouse_client Client)
add_object_library(clickhouse_bridge BridgeHelper)

View File

@ -637,11 +637,11 @@ The server successfully detected this situation and will download merged part fr
M(S3QueueSetFileProcessingMicroseconds, "Time spent to set file as processing")\
M(S3QueueSetFileProcessedMicroseconds, "Time spent to set file as processed")\
M(S3QueueSetFileFailedMicroseconds, "Time spent to set file as failed")\
M(S3QueueFailedFiles, "Number of files which failed to be processed")\
M(S3QueueProcessedFiles, "Number of files which were processed")\
M(S3QueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed")\
M(S3QueuePullMicroseconds, "Time spent to read file data")\
M(S3QueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses")\
M(ObjectStorageQueueFailedFiles, "Number of files which failed to be processed")\
M(ObjectStorageQueueProcessedFiles, "Number of files which were processed")\
M(ObjectStorageQueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed")\
M(ObjectStorageQueuePullMicroseconds, "Time spent to read file data")\
M(ObjectStorageQueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses")\
\
M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds")\
M(IOUringSQEsSubmitted, "Total number of io_uring SQEs submitted") \

View File

@ -11,7 +11,7 @@
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/ObjectStorageQueueLog.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ZooKeeperLog.h>

View File

@ -25,7 +25,7 @@
M(ZooKeeperLogElement) \
M(ProcessorProfileLogElement) \
M(TextLogElement) \
M(S3QueueLogElement) \
M(ObjectStorageQueueLogElement) \
M(FilesystemCacheLogElement) \
M(FilesystemReadPrefetchesLogElement) \
M(AsynchronousInsertLogElement) \

View File

@ -201,13 +201,13 @@ IMPLEMENT_SETTING_ENUM(ORCCompression, ErrorCodes::BAD_ARGUMENTS,
{"zlib", FormatSettings::ORCCompression::ZLIB},
{"lz4", FormatSettings::ORCCompression::LZ4}})
IMPLEMENT_SETTING_ENUM(S3QueueMode, ErrorCodes::BAD_ARGUMENTS,
{{"ordered", S3QueueMode::ORDERED},
{"unordered", S3QueueMode::UNORDERED}})
IMPLEMENT_SETTING_ENUM(ObjectStorageQueueMode, ErrorCodes::BAD_ARGUMENTS,
{{"ordered", ObjectStorageQueueMode::ORDERED},
{"unordered", ObjectStorageQueueMode::UNORDERED}})
IMPLEMENT_SETTING_ENUM(S3QueueAction, ErrorCodes::BAD_ARGUMENTS,
{{"keep", S3QueueAction::KEEP},
{"delete", S3QueueAction::DELETE}})
IMPLEMENT_SETTING_ENUM(ObjectStorageQueueAction, ErrorCodes::BAD_ARGUMENTS,
{{"keep", ObjectStorageQueueAction::KEEP},
{"delete", ObjectStorageQueueAction::DELETE}})
IMPLEMENT_SETTING_ENUM(ExternalCommandStderrReaction, ErrorCodes::BAD_ARGUMENTS,
{{"none", ExternalCommandStderrReaction::NONE},

View File

@ -341,21 +341,21 @@ DECLARE_SETTING_ENUM(ParallelReplicasCustomKeyFilterType)
DECLARE_SETTING_ENUM(LocalFSReadMethod)
enum class S3QueueMode : uint8_t
enum class ObjectStorageQueueMode : uint8_t
{
ORDERED,
UNORDERED,
};
DECLARE_SETTING_ENUM(S3QueueMode)
DECLARE_SETTING_ENUM(ObjectStorageQueueMode)
enum class S3QueueAction : uint8_t
enum class ObjectStorageQueueAction : uint8_t
{
KEEP,
DELETE,
};
DECLARE_SETTING_ENUM(S3QueueAction)
DECLARE_SETTING_ENUM(ObjectStorageQueueAction)
DECLARE_SETTING_ENUM(ExternalCommandStderrReaction)

View File

@ -4135,7 +4135,7 @@ std::shared_ptr<FilesystemCacheLog> Context::getFilesystemCacheLog() const
return shared->system_logs->filesystem_cache_log;
}
std::shared_ptr<S3QueueLog> Context::getS3QueueLog() const
std::shared_ptr<ObjectStorageQueueLog> Context::getS3QueueLog() const
{
SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
@ -4144,6 +4144,15 @@ std::shared_ptr<S3QueueLog> Context::getS3QueueLog() const
return shared->system_logs->s3_queue_log;
}
std::shared_ptr<ObjectStorageQueueLog> Context::getAzureQueueLog() const
{
SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
return shared->system_logs->azure_queue_log;
}
std::shared_ptr<FilesystemReadPrefetchesLog> Context::getFilesystemReadPrefetchesLog() const
{
SharedLockGuard lock(shared->mutex);

View File

@ -107,7 +107,7 @@ class TransactionsInfoLog;
class ProcessorsProfileLog;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class S3QueueLog;
class ObjectStorageQueueLog;
class AsynchronousInsertLog;
class BackupLog;
class BlobStorageLog;
@ -1133,7 +1133,8 @@ public:
std::shared_ptr<TransactionsInfoLog> getTransactionsInfoLog() const;
std::shared_ptr<ProcessorsProfileLog> getProcessorsProfileLog() const;
std::shared_ptr<FilesystemCacheLog> getFilesystemCacheLog() const;
std::shared_ptr<S3QueueLog> getS3QueueLog() const;
std::shared_ptr<ObjectStorageQueueLog> getS3QueueLog() const;
std::shared_ptr<ObjectStorageQueueLog> getAzureQueueLog() const;
std::shared_ptr<FilesystemReadPrefetchesLog> getFilesystemReadPrefetchesLog() const;
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
std::shared_ptr<BackupLog> getBackupLog() const;

View File

@ -8,19 +8,19 @@
#include <DataTypes/DataTypeMap.h>
#include <Interpreters/ProfileEventsExt.h>
#include <DataTypes/DataTypeEnum.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/ObjectStorageQueueLog.h>
namespace DB
{
ColumnsDescription S3QueueLogElement::getColumnsDescription()
ColumnsDescription ObjectStorageQueueLogElement::getColumnsDescription()
{
auto status_datatype = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"Processed", static_cast<Int8>(S3QueueLogElement::S3QueueStatus::Processed)},
{"Failed", static_cast<Int8>(S3QueueLogElement::S3QueueStatus::Failed)},
{"Processed", static_cast<Int8>(ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed)},
{"Failed", static_cast<Int8>(ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed)},
});
return ColumnsDescription
@ -41,7 +41,7 @@ ColumnsDescription S3QueueLogElement::getColumnsDescription()
};
}
void S3QueueLogElement::appendToBlock(MutableColumns & columns) const
void ObjectStorageQueueLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
columns[i++]->insert(getFQDNOrHostName());

View File

@ -9,7 +9,7 @@
namespace DB
{
struct S3QueueLogElement
struct ObjectStorageQueueLogElement
{
time_t event_time{};
@ -20,18 +20,18 @@ struct S3QueueLogElement
std::string file_name;
size_t rows_processed = 0;
enum class S3QueueStatus : uint8_t
enum class ObjectStorageQueueStatus : uint8_t
{
Processed,
Failed,
};
S3QueueStatus status;
ObjectStorageQueueStatus status;
ProfileEvents::Counters::Snapshot counters_snapshot;
time_t processing_start_time;
time_t processing_end_time;
std::string exception;
static std::string name() { return "S3QueueLog"; }
static std::string name() { return "ObjectStorageQueueLog"; }
static ColumnsDescription getColumnsDescription();
static NamesAndAliases getNamesAndAliases() { return {}; }
@ -39,9 +39,9 @@ struct S3QueueLogElement
void appendToBlock(MutableColumns & columns) const;
};
class S3QueueLog : public SystemLog<S3QueueLogElement>
class ObjectStorageQueueLog : public SystemLog<ObjectStorageQueueLogElement>
{
using SystemLog<S3QueueLogElement>::SystemLog;
using SystemLog<ObjectStorageQueueLogElement>::SystemLog;
};
}

View File

@ -25,7 +25,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/ObjectStorageQueueLog.h>
#include <Interpreters/SessionLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
@ -306,7 +306,8 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
processors_profile_log = createSystemLog<ProcessorsProfileLog>(global_context, "system", "processors_profile_log", config, "processors_profile_log", "Contains profiling information on processors level (building blocks for a pipeline for query execution.");
asynchronous_insert_log = createSystemLog<AsynchronousInsertLog>(global_context, "system", "asynchronous_insert_log", config, "asynchronous_insert_log", "Contains a history for all asynchronous inserts executed on current server.");
backup_log = createSystemLog<BackupLog>(global_context, "system", "backup_log", config, "backup_log", "Contains logging entries with the information about BACKUP and RESTORE operations.");
s3_queue_log = createSystemLog<S3QueueLog>(global_context, "system", "s3queue_log", config, "s3queue_log", "Contains logging entries with the information files processes by S3Queue engine.");
s3_queue_log = createSystemLog<ObjectStorageQueueLog>(global_context, "system", "s3queue_log", config, "s3queue_log", "Contains logging entries with the information files processes by S3Queue engine.");
azure_queue_log = createSystemLog<ObjectStorageQueueLog>(global_context, "system", "azure_queue_log", config, "azure_queue_log", "Contains logging entries with the information files processes by S3Queue engine.");
blob_storage_log = createSystemLog<BlobStorageLog>(global_context, "system", "blob_storage_log", config, "blob_storage_log", "Contains logging entries with information about various blob storage operations such as uploads and deletes.");
if (query_log)

View File

@ -53,7 +53,7 @@ class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class AsynchronousInsertLog;
class BackupLog;
class S3QueueLog;
class ObjectStorageQueueLog;
class BlobStorageLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
@ -76,7 +76,8 @@ struct SystemLogs
std::shared_ptr<ErrorLog> error_log; /// Used to log errors.
std::shared_ptr<FilesystemCacheLog> filesystem_cache_log;
std::shared_ptr<FilesystemReadPrefetchesLog> filesystem_read_prefetches_log;
std::shared_ptr<S3QueueLog> s3_queue_log;
std::shared_ptr<ObjectStorageQueueLog> s3_queue_log;
std::shared_ptr<ObjectStorageQueueLog> azure_queue_log;
/// Metrics from system.asynchronous_metrics.
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans.

View File

@ -15,7 +15,7 @@ class SchemaCache;
class StorageObjectStorageSource : public SourceWithKeyCondition, WithContext
{
friend class StorageS3QueueSource;
friend class ObjectStorageQueueSource;
public:
using Configuration = StorageObjectStorage::Configuration;
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;

View File

@ -1,4 +1,4 @@
#include <Storages/S3Queue/S3QueueIFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h>
#include <Common/SipHash.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
@ -11,8 +11,8 @@
namespace ProfileEvents
{
extern const Event S3QueueProcessedFiles;
extern const Event S3QueueFailedFiles;
extern const Event ObjectStorageQueueProcessedFiles;
extern const Event ObjectStorageQueueFailedFiles;
};
namespace DB
@ -35,25 +35,25 @@ namespace
}
}
void S3QueueIFileMetadata::FileStatus::setProcessingEndTime()
void ObjectStorageQueueIFileMetadata::FileStatus::setProcessingEndTime()
{
processing_end_time = now();
}
void S3QueueIFileMetadata::FileStatus::onProcessing()
void ObjectStorageQueueIFileMetadata::FileStatus::onProcessing()
{
state = FileStatus::State::Processing;
processing_start_time = now();
}
void S3QueueIFileMetadata::FileStatus::onProcessed()
void ObjectStorageQueueIFileMetadata::FileStatus::onProcessed()
{
state = FileStatus::State::Processed;
if (!processing_end_time)
setProcessingEndTime();
}
void S3QueueIFileMetadata::FileStatus::onFailed(const std::string & exception)
void ObjectStorageQueueIFileMetadata::FileStatus::onFailed(const std::string & exception)
{
state = FileStatus::State::Failed;
if (!processing_end_time)
@ -62,13 +62,13 @@ void S3QueueIFileMetadata::FileStatus::onFailed(const std::string & exception)
last_exception = exception;
}
std::string S3QueueIFileMetadata::FileStatus::getException() const
std::string ObjectStorageQueueIFileMetadata::FileStatus::getException() const
{
std::lock_guard lock(last_exception_mutex);
return last_exception;
}
std::string S3QueueIFileMetadata::NodeMetadata::toString() const
std::string ObjectStorageQueueIFileMetadata::NodeMetadata::toString() const
{
Poco::JSON::Object json;
json.set("file_path", file_path);
@ -83,7 +83,7 @@ std::string S3QueueIFileMetadata::NodeMetadata::toString() const
return oss.str();
}
S3QueueIFileMetadata::NodeMetadata S3QueueIFileMetadata::NodeMetadata::fromString(const std::string & metadata_str)
ObjectStorageQueueIFileMetadata::NodeMetadata ObjectStorageQueueIFileMetadata::NodeMetadata::fromString(const std::string & metadata_str)
{
Poco::JSON::Parser parser;
auto json = parser.parse(metadata_str).extract<Poco::JSON::Object::Ptr>();
@ -98,7 +98,7 @@ S3QueueIFileMetadata::NodeMetadata S3QueueIFileMetadata::NodeMetadata::fromStrin
return metadata;
}
S3QueueIFileMetadata::S3QueueIFileMetadata(
ObjectStorageQueueIFileMetadata::ObjectStorageQueueIFileMetadata(
const std::string & path_,
const std::string & processing_node_path_,
const std::string & processed_node_path_,
@ -123,7 +123,7 @@ S3QueueIFileMetadata::S3QueueIFileMetadata(
processed_node_path, processing_node_path, failed_node_path);
}
S3QueueIFileMetadata::~S3QueueIFileMetadata()
ObjectStorageQueueIFileMetadata::~ObjectStorageQueueIFileMetadata()
{
if (processing_id_version.has_value())
{
@ -162,9 +162,9 @@ S3QueueIFileMetadata::~S3QueueIFileMetadata()
}
}
std::string S3QueueIFileMetadata::getNodeName(const std::string & path)
std::string ObjectStorageQueueIFileMetadata::getNodeName(const std::string & path)
{
/// Since with are dealing with paths in s3 which can have "/",
/// Since with are dealing with paths in object storage which can have "/",
/// we cannot create a zookeeper node with the name equal to path.
/// Therefore we use a hash of the path as a node name.
@ -173,7 +173,7 @@ std::string S3QueueIFileMetadata::getNodeName(const std::string & path)
return toString(path_hash.get64());
}
S3QueueIFileMetadata::NodeMetadata S3QueueIFileMetadata::createNodeMetadata(
ObjectStorageQueueIFileMetadata::NodeMetadata ObjectStorageQueueIFileMetadata::createNodeMetadata(
const std::string & path,
const std::string & exception,
size_t retries)
@ -182,9 +182,9 @@ S3QueueIFileMetadata::NodeMetadata S3QueueIFileMetadata::createNodeMetadata(
/// Since node name is just a hash we want to know to which file it corresponds,
/// so we keep "file_path" in nodes data.
/// "last_processed_timestamp" is needed for TTL metadata nodes enabled by s3queue_tracked_file_ttl_sec.
/// "last_exception" is kept for introspection, should also be visible in system.s3queue_log if it is enabled.
/// "retries" is kept for retrying the processing enabled by s3queue_loading_retries.
/// "last_processed_timestamp" is needed for TTL metadata nodes enabled by tracked_file_ttl_sec.
/// "last_exception" is kept for introspection, should also be visible in system.s3(azure)queue_log if it is enabled.
/// "retries" is kept for retrying the processing enabled by loading_retries.
NodeMetadata metadata;
metadata.file_path = path;
metadata.last_processed_timestamp = now();
@ -193,7 +193,7 @@ S3QueueIFileMetadata::NodeMetadata S3QueueIFileMetadata::createNodeMetadata(
return metadata;
}
std::string S3QueueIFileMetadata::getProcessorInfo(const std::string & processor_id)
std::string ObjectStorageQueueIFileMetadata::getProcessorInfo(const std::string & processor_id)
{
/// Add information which will be useful for debugging just in case.
Poco::JSON::Object json;
@ -206,7 +206,7 @@ std::string S3QueueIFileMetadata::getProcessorInfo(const std::string & processor
return oss.str();
}
bool S3QueueIFileMetadata::setProcessing()
bool ObjectStorageQueueIFileMetadata::setProcessing()
{
auto state = file_status->state.load();
if (state == FileStatus::State::Processing
@ -235,11 +235,11 @@ bool S3QueueIFileMetadata::setProcessing()
return success;
}
void S3QueueIFileMetadata::setProcessed()
void ObjectStorageQueueIFileMetadata::setProcessed()
{
LOG_TRACE(log, "Setting file {} as processed (path: {})", path, processed_node_path);
ProfileEvents::increment(ProfileEvents::S3QueueProcessedFiles);
ProfileEvents::increment(ProfileEvents::ObjectStorageQueueProcessedFiles);
file_status->onProcessed();
try
@ -258,12 +258,12 @@ void S3QueueIFileMetadata::setProcessed()
LOG_TRACE(log, "Set file {} as processed (rows: {})", path, file_status->processed_rows);
}
void S3QueueIFileMetadata::setFailed(const std::string & exception_message, bool reduce_retry_count, bool overwrite_status)
void ObjectStorageQueueIFileMetadata::setFailed(const std::string & exception_message, bool reduce_retry_count, bool overwrite_status)
{
LOG_TRACE(log, "Setting file {} as failed (path: {}, reduce retry count: {}, exception: {})",
path, failed_node_path, reduce_retry_count, exception_message);
ProfileEvents::increment(ProfileEvents::S3QueueFailedFiles);
ProfileEvents::increment(ProfileEvents::ObjectStorageQueueFailedFiles);
if (overwrite_status || file_status->state != FileStatus::State::Failed)
file_status->onFailed(exception_message);
@ -295,7 +295,7 @@ void S3QueueIFileMetadata::setFailed(const std::string & exception_message, bool
LOG_TRACE(log, "Set file {} as failed (rows: {})", path, file_status->processed_rows);
}
void S3QueueIFileMetadata::setFailedNonRetriable()
void ObjectStorageQueueIFileMetadata::setFailedNonRetriable()
{
auto zk_client = getZooKeeper();
Coordination::Requests requests;
@ -326,7 +326,7 @@ void S3QueueIFileMetadata::setFailedNonRetriable()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error while setting file as failed: {}", code);
}
void S3QueueIFileMetadata::setFailedRetriable()
void ObjectStorageQueueIFileMetadata::setFailedRetriable()
{
/// Instead of creating a persistent /failed/node_hash node
/// we create a persistent /failed/node_hash.retriable node.

View File

@ -6,7 +6,7 @@
namespace DB
{
class S3QueueIFileMetadata
class ObjectStorageQueueIFileMetadata
{
public:
struct FileStatus
@ -42,7 +42,7 @@ public:
};
using FileStatusPtr = std::shared_ptr<FileStatus>;
explicit S3QueueIFileMetadata(
explicit ObjectStorageQueueIFileMetadata(
const std::string & path_,
const std::string & processing_node_path_,
const std::string & processed_node_path_,
@ -51,7 +51,7 @@ public:
size_t max_loading_retries_,
LoggerPtr log_);
virtual ~S3QueueIFileMetadata();
virtual ~ObjectStorageQueueIFileMetadata();
bool setProcessing();
void setProcessed();
@ -95,7 +95,7 @@ protected:
LoggerPtr log;
/// processing node is ephemeral, so we cannot verify with it if
/// this node was created by a certain processor on a previous s3 queue processing stage,
/// this node was created by a certain processor on a previous processing stage,
/// because we could get a session expired in between the stages
/// and someone else could just create this processing node.
/// Therefore we also create a persistent processing node

View File

@ -4,13 +4,12 @@
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueIFileMetadata.h>
#include <Storages/S3Queue/S3QueueOrderedFileMetadata.h>
#include <Storages/S3Queue/S3QueueUnorderedFileMetadata.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <IO/S3Settings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueUnorderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h>
#include <Storages/StorageSnapshot.h>
#include <base/sleep.h>
#include <Common/CurrentThread.h>
@ -22,13 +21,8 @@
namespace ProfileEvents
{
extern const Event S3QueueSetFileProcessingMicroseconds;
extern const Event S3QueueSetFileProcessedMicroseconds;
extern const Event S3QueueSetFileFailedMicroseconds;
extern const Event S3QueueFailedFiles;
extern const Event S3QueueProcessedFiles;
extern const Event S3QueueCleanupMaxSetSizeOrTTLMicroseconds;
extern const Event S3QueueLockLocalFileStatusesMicroseconds;
extern const Event ObjectStorageQueueCleanupMaxSetSizeOrTTLMicroseconds;
extern const Event ObjectStorageQueueLockLocalFileStatusesMicroseconds;
};
namespace DB
@ -63,7 +57,7 @@ namespace
}
}
class S3QueueMetadata::LocalFileStatuses
class ObjectStorageQueueMetadata::LocalFileStatuses
{
public:
LocalFileStatuses() = default;
@ -109,98 +103,89 @@ private:
std::unique_lock<std::mutex> lock() const
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueLockLocalFileStatusesMicroseconds);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::ObjectStorageQueueLockLocalFileStatusesMicroseconds);
return std::unique_lock(mutex);
}
};
S3QueueMetadata::S3QueueMetadata(const fs::path & zookeeper_path_, const S3QueueSettings & settings_)
ObjectStorageQueueMetadata::ObjectStorageQueueMetadata(const fs::path & zookeeper_path_, const ObjectStorageQueueSettings & settings_)
: settings(settings_)
, zookeeper_path(zookeeper_path_)
, buckets_num(getBucketsNum(settings_))
, log(getLogger("StorageS3Queue(" + zookeeper_path_.string() + ")"))
, log(getLogger("StorageObjectStorageQueue(" + zookeeper_path_.string() + ")"))
, local_file_statuses(std::make_shared<LocalFileStatuses>())
{
if (settings.mode == S3QueueMode::UNORDERED
&& (settings.s3queue_tracked_files_limit || settings.s3queue_tracked_file_ttl_sec))
if (settings.mode == ObjectStorageQueueMode::UNORDERED
&& (settings.tracked_files_limit || settings.tracked_file_ttl_sec))
{
task = Context::getGlobalContextInstance()->getSchedulePool().createTask(
"S3QueueCleanupFunc",
"ObjectStorageQueueCleanupFunc",
[this] { cleanupThreadFunc(); });
task->activate();
task->scheduleAfter(
generateRescheduleInterval(
settings.s3queue_cleanup_interval_min_ms, settings.s3queue_cleanup_interval_max_ms));
settings.cleanup_interval_min_ms, settings.cleanup_interval_max_ms));
}
LOG_TRACE(log, "Mode: {}, buckets: {}, processing threads: {}, result buckets num: {}",
settings.mode.toString(), settings.s3queue_buckets, settings.s3queue_processing_threads_num, buckets_num);
settings.mode.toString(), settings.buckets, settings.processing_threads_num, buckets_num);
}
S3QueueMetadata::~S3QueueMetadata()
ObjectStorageQueueMetadata::~ObjectStorageQueueMetadata()
{
shutdown();
}
void S3QueueMetadata::shutdown()
void ObjectStorageQueueMetadata::shutdown()
{
shutdown_called = true;
if (task)
task->deactivate();
}
void S3QueueMetadata::checkSettings(const S3QueueSettings & settings_) const
void ObjectStorageQueueMetadata::checkSettings(const ObjectStorageQueueSettings & settings_) const
{
S3QueueTableMetadata::checkEquals(settings, settings_);
ObjectStorageQueueTableMetadata::checkEquals(settings, settings_);
}
S3QueueMetadata::FileStatusPtr S3QueueMetadata::getFileStatus(const std::string & path)
ObjectStorageQueueMetadata::FileStatusPtr ObjectStorageQueueMetadata::getFileStatus(const std::string & path)
{
return local_file_statuses->get(path, /* create */false);
}
S3QueueMetadata::FileStatuses S3QueueMetadata::getFileStatuses() const
ObjectStorageQueueMetadata::FileStatuses ObjectStorageQueueMetadata::getFileStatuses() const
{
return local_file_statuses->getAll();
}
S3QueueMetadata::FileMetadataPtr S3QueueMetadata::getFileMetadata(
ObjectStorageQueueMetadata::FileMetadataPtr ObjectStorageQueueMetadata::getFileMetadata(
const std::string & path,
S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info)
ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info)
{
auto file_status = local_file_statuses->get(path, /* create */true);
switch (settings.mode.value)
{
case S3QueueMode::ORDERED:
return std::make_shared<S3QueueOrderedFileMetadata>(
case ObjectStorageQueueMode::ORDERED:
return std::make_shared<ObjectStorageQueueOrderedFileMetadata>(
zookeeper_path,
path,
file_status,
bucket_info,
buckets_num,
settings.s3queue_loading_retries,
settings.loading_retries,
log);
case S3QueueMode::UNORDERED:
return std::make_shared<S3QueueUnorderedFileMetadata>(
case ObjectStorageQueueMode::UNORDERED:
return std::make_shared<ObjectStorageQueueUnorderedFileMetadata>(
zookeeper_path,
path,
file_status,
settings.s3queue_loading_retries,
settings.loading_retries,
log);
}
}
size_t S3QueueMetadata::getBucketsNum(const S3QueueSettings & settings)
{
if (settings.s3queue_buckets)
return settings.s3queue_buckets;
if (settings.s3queue_processing_threads_num)
return settings.s3queue_processing_threads_num;
return 0;
}
size_t S3QueueMetadata::getBucketsNum(const S3QueueTableMetadata & settings)
size_t ObjectStorageQueueMetadata::getBucketsNum(const ObjectStorageQueueSettings & settings)
{
if (settings.buckets)
return settings.buckets;
@ -209,32 +194,41 @@ size_t S3QueueMetadata::getBucketsNum(const S3QueueTableMetadata & settings)
return 0;
}
bool S3QueueMetadata::useBucketsForProcessing() const
size_t ObjectStorageQueueMetadata::getBucketsNum(const ObjectStorageQueueTableMetadata & settings)
{
return settings.mode == S3QueueMode::ORDERED && (buckets_num > 1);
if (settings.buckets)
return settings.buckets;
if (settings.processing_threads_num)
return settings.processing_threads_num;
return 0;
}
S3QueueMetadata::Bucket S3QueueMetadata::getBucketForPath(const std::string & path) const
bool ObjectStorageQueueMetadata::useBucketsForProcessing() const
{
return S3QueueOrderedFileMetadata::getBucketForPath(path, buckets_num);
return settings.mode == ObjectStorageQueueMode::ORDERED && (buckets_num > 1);
}
S3QueueOrderedFileMetadata::BucketHolderPtr
S3QueueMetadata::tryAcquireBucket(const Bucket & bucket, const Processor & processor)
ObjectStorageQueueMetadata::Bucket ObjectStorageQueueMetadata::getBucketForPath(const std::string & path) const
{
return S3QueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor, log);
return ObjectStorageQueueOrderedFileMetadata::getBucketForPath(path, buckets_num);
}
void S3QueueMetadata::initialize(
ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr
ObjectStorageQueueMetadata::tryAcquireBucket(const Bucket & bucket, const Processor & processor)
{
return ObjectStorageQueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor, log);
}
void ObjectStorageQueueMetadata::initialize(
const ConfigurationPtr & configuration,
const StorageInMemoryMetadata & storage_metadata)
{
const auto metadata_from_table = S3QueueTableMetadata(*configuration, settings, storage_metadata);
const auto metadata_from_table = ObjectStorageQueueTableMetadata(*configuration, settings, storage_metadata);
const auto & columns_from_table = storage_metadata.getColumns();
const auto table_metadata_path = zookeeper_path / "metadata";
const auto metadata_paths = settings.mode == S3QueueMode::ORDERED
? S3QueueOrderedFileMetadata::getMetadataPaths(buckets_num)
: S3QueueUnorderedFileMetadata::getMetadataPaths();
const auto metadata_paths = settings.mode == ObjectStorageQueueMode::ORDERED
? ObjectStorageQueueOrderedFileMetadata::getMetadataPaths(buckets_num)
: ObjectStorageQueueUnorderedFileMetadata::getMetadataPaths();
auto zookeeper = getZooKeeper();
zookeeper->createAncestors(zookeeper_path);
@ -243,7 +237,7 @@ void S3QueueMetadata::initialize(
{
if (zookeeper->exists(table_metadata_path))
{
const auto metadata_from_zk = S3QueueTableMetadata::parse(zookeeper->get(fs::path(zookeeper_path) / "metadata"));
const auto metadata_from_zk = ObjectStorageQueueTableMetadata::parse(zookeeper->get(fs::path(zookeeper_path) / "metadata"));
const auto columns_from_zk = ColumnsDescription::parse(metadata_from_zk.columns);
metadata_from_table.checkEquals(metadata_from_zk);
@ -268,8 +262,8 @@ void S3QueueMetadata::initialize(
requests.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent));
}
if (!settings.s3queue_last_processed_path.value.empty())
getFileMetadata(settings.s3queue_last_processed_path)->setProcessedAtStartRequests(requests, zookeeper);
if (!settings.last_processed_path.value.empty())
getFileMetadata(settings.last_processed_path)->setProcessedAtStartRequests(requests, zookeeper);
Coordination::Responses responses;
auto code = zookeeper->tryMulti(requests, responses);
@ -293,10 +287,10 @@ void S3QueueMetadata::initialize(
"of wrong zookeeper path or because of logical error");
}
void S3QueueMetadata::cleanupThreadFunc()
void ObjectStorageQueueMetadata::cleanupThreadFunc()
{
/// A background task is responsible for maintaining
/// settings.s3queue_tracked_files_limit and max_set_age settings for `unordered` processing mode.
/// settings.tracked_files_limit and max_set_age settings for `unordered` processing mode.
if (shutdown_called)
return;
@ -315,12 +309,12 @@ void S3QueueMetadata::cleanupThreadFunc()
task->scheduleAfter(
generateRescheduleInterval(
settings.s3queue_cleanup_interval_min_ms, settings.s3queue_cleanup_interval_max_ms));
settings.cleanup_interval_min_ms, settings.cleanup_interval_max_ms));
}
void S3QueueMetadata::cleanupThreadFuncImpl()
void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueCleanupMaxSetSizeOrTTLMicroseconds);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::ObjectStorageQueueCleanupMaxSetSizeOrTTLMicroseconds);
const auto zk_client = getZooKeeper();
const fs::path zookeeper_processed_path = zookeeper_path / "processed";
const fs::path zookeeper_failed_path = zookeeper_path / "failed";
@ -358,11 +352,11 @@ void S3QueueMetadata::cleanupThreadFuncImpl()
return;
}
chassert(settings.s3queue_tracked_files_limit || settings.s3queue_tracked_file_ttl_sec);
const bool check_nodes_limit = settings.s3queue_tracked_files_limit > 0;
const bool check_nodes_ttl = settings.s3queue_tracked_file_ttl_sec > 0;
chassert(settings.tracked_files_limit || settings.tracked_file_ttl_sec);
const bool check_nodes_limit = settings.tracked_files_limit > 0;
const bool check_nodes_ttl = settings.tracked_file_ttl_sec > 0;
const bool nodes_limit_exceeded = nodes_num > settings.s3queue_tracked_files_limit;
const bool nodes_limit_exceeded = nodes_num > settings.tracked_files_limit;
if ((!nodes_limit_exceeded || !check_nodes_limit) && !check_nodes_ttl)
{
LOG_TEST(log, "No limit exceeded");
@ -384,7 +378,7 @@ void S3QueueMetadata::cleanupThreadFuncImpl()
struct Node
{
std::string zk_path;
S3QueueIFileMetadata::NodeMetadata metadata;
ObjectStorageQueueIFileMetadata::NodeMetadata metadata;
};
auto node_cmp = [](const Node & a, const Node & b)
{
@ -405,7 +399,7 @@ void S3QueueMetadata::cleanupThreadFuncImpl()
std::string metadata_str;
if (zk_client->tryGet(path, metadata_str))
{
sorted_nodes.emplace(path, S3QueueIFileMetadata::NodeMetadata::fromString(metadata_str));
sorted_nodes.emplace(path, ObjectStorageQueueIFileMetadata::NodeMetadata::fromString(metadata_str));
LOG_TEST(log, "Fetched metadata for node {}", path);
}
else
@ -435,9 +429,9 @@ void S3QueueMetadata::cleanupThreadFuncImpl()
wb << fmt::format("Node: {}, path: {}, timestamp: {};\n", node, metadata.file_path, metadata.last_processed_timestamp);
return wb.str();
};
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", settings.s3queue_tracked_files_limit, settings.s3queue_tracked_file_ttl_sec, get_nodes_str());
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", settings.tracked_files_limit, settings.tracked_file_ttl_sec, get_nodes_str());
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - settings.s3queue_tracked_files_limit : 0;
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - settings.tracked_files_limit : 0;
for (const auto & node : sorted_nodes)
{
if (nodes_to_remove)
@ -456,7 +450,7 @@ void S3QueueMetadata::cleanupThreadFuncImpl()
else if (check_nodes_ttl)
{
UInt64 node_age = getCurrentTime() - node.metadata.last_processed_timestamp;
if (node_age >= settings.s3queue_tracked_file_ttl_sec)
if (node_age >= settings.tracked_file_ttl_sec)
{
LOG_TRACE(log, "Removing node at path {} ({}) because file ttl is reached",
node.metadata.file_path, node.zk_path);

View File

@ -7,23 +7,23 @@
#include <Core/BackgroundSchedulePool.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/S3Queue/S3QueueIFileMetadata.h>
#include <Storages/S3Queue/S3QueueOrderedFileMetadata.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
namespace fs = std::filesystem;
namespace Poco { class Logger; }
namespace DB
{
struct S3QueueSettings;
class StorageS3Queue;
struct S3QueueTableMetadata;
struct ObjectStorageQueueSettings;
class StorageObjectStorageQueue;
struct ObjectStorageQueueTableMetadata;
struct StorageInMemoryMetadata;
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
/**
* A class for managing S3Queue metadata in zookeeper, e.g.
* A class for managing ObjectStorageQueue metadata in zookeeper, e.g.
* the following folders:
* - <path_to_metadata>/processed
* - <path_to_metadata>/processing
@ -35,7 +35,7 @@ using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
* - <path_to_metadata>/processing
* - <path_to_metadata>/failed
*
* Depending on S3Queue processing mode (ordered or unordered)
* Depending on ObjectStorageQueue processing mode (ordered or unordered)
* we can differently store metadata in /processed node.
*
* Implements caching of zookeeper metadata for faster responses.
@ -44,24 +44,24 @@ using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
* In case of Unordered mode - if files TTL is enabled or maximum tracked files limit is set
* starts a background cleanup thread which is responsible for maintaining them.
*/
class S3QueueMetadata
class ObjectStorageQueueMetadata
{
public:
using FileStatus = S3QueueIFileMetadata::FileStatus;
using FileMetadataPtr = std::shared_ptr<S3QueueIFileMetadata>;
using FileStatus = ObjectStorageQueueIFileMetadata::FileStatus;
using FileMetadataPtr = std::shared_ptr<ObjectStorageQueueIFileMetadata>;
using FileStatusPtr = std::shared_ptr<FileStatus>;
using FileStatuses = std::unordered_map<std::string, FileStatusPtr>;
using Bucket = size_t;
using Processor = std::string;
S3QueueMetadata(const fs::path & zookeeper_path_, const S3QueueSettings & settings_);
~S3QueueMetadata();
ObjectStorageQueueMetadata(const fs::path & zookeeper_path_, const ObjectStorageQueueSettings & settings_);
~ObjectStorageQueueMetadata();
void initialize(const ConfigurationPtr & configuration, const StorageInMemoryMetadata & storage_metadata);
void checkSettings(const S3QueueSettings & settings) const;
void checkSettings(const ObjectStorageQueueSettings & settings) const;
void shutdown();
FileMetadataPtr getFileMetadata(const std::string & path, S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info = {});
FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {});
FileStatusPtr getFileStatus(const std::string & path);
FileStatuses getFileStatuses() const;
@ -69,16 +69,16 @@ public:
/// Method of Ordered mode parallel processing.
bool useBucketsForProcessing() const;
Bucket getBucketForPath(const std::string & path) const;
S3QueueOrderedFileMetadata::BucketHolderPtr tryAcquireBucket(const Bucket & bucket, const Processor & processor);
ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr tryAcquireBucket(const Bucket & bucket, const Processor & processor);
static size_t getBucketsNum(const S3QueueSettings & settings);
static size_t getBucketsNum(const S3QueueTableMetadata & settings);
static size_t getBucketsNum(const ObjectStorageQueueSettings & settings);
static size_t getBucketsNum(const ObjectStorageQueueTableMetadata & settings);
private:
void cleanupThreadFunc();
void cleanupThreadFuncImpl();
const S3QueueSettings settings;
const ObjectStorageQueueSettings settings;
const fs::path zookeeper_path;
const size_t buckets_num;

View File

@ -1,4 +1,4 @@
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h>
#include <Interpreters/Context.h>
namespace DB
@ -8,20 +8,20 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
S3QueueMetadataFactory & S3QueueMetadataFactory::instance()
ObjectStorageQueueMetadataFactory & ObjectStorageQueueMetadataFactory::instance()
{
static S3QueueMetadataFactory ret;
static ObjectStorageQueueMetadataFactory ret;
return ret;
}
S3QueueMetadataFactory::FilesMetadataPtr
S3QueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const S3QueueSettings & settings)
ObjectStorageQueueMetadataFactory::FilesMetadataPtr
ObjectStorageQueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const ObjectStorageQueueSettings & settings)
{
std::lock_guard lock(mutex);
auto it = metadata_by_path.find(zookeeper_path);
if (it == metadata_by_path.end())
{
auto files_metadata = std::make_shared<S3QueueMetadata>(zookeeper_path, settings);
auto files_metadata = std::make_shared<ObjectStorageQueueMetadata>(zookeeper_path, settings);
it = metadata_by_path.emplace(zookeeper_path, std::move(files_metadata)).first;
}
else
@ -32,7 +32,7 @@ S3QueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const S3
return it->second.metadata;
}
void S3QueueMetadataFactory::remove(const std::string & zookeeper_path)
void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_path)
{
std::lock_guard lock(mutex);
auto it = metadata_by_path.find(zookeeper_path);
@ -57,9 +57,9 @@ void S3QueueMetadataFactory::remove(const std::string & zookeeper_path)
}
}
std::unordered_map<std::string, S3QueueMetadataFactory::FilesMetadataPtr> S3QueueMetadataFactory::getAll()
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> ObjectStorageQueueMetadataFactory::getAll()
{
std::unordered_map<std::string, S3QueueMetadataFactory::FilesMetadataPtr> result;
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> result;
for (const auto & [zk_path, metadata_and_ref_count] : metadata_by_path)
result.emplace(zk_path, metadata_and_ref_count.metadata);
return result;

View File

@ -0,0 +1,37 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
namespace DB
{
class ObjectStorageQueueMetadataFactory final : private boost::noncopyable
{
public:
using FilesMetadataPtr = std::shared_ptr<ObjectStorageQueueMetadata>;
static ObjectStorageQueueMetadataFactory & instance();
FilesMetadataPtr getOrCreate(const std::string & zookeeper_path, const ObjectStorageQueueSettings & settings);
void remove(const std::string & zookeeper_path);
std::unordered_map<std::string, FilesMetadataPtr> getAll();
private:
struct Metadata
{
explicit Metadata(std::shared_ptr<ObjectStorageQueueMetadata> metadata_) : metadata(metadata_), ref_count(1) {}
std::shared_ptr<ObjectStorageQueueMetadata> metadata;
/// TODO: the ref count should be kept in keeper, because of the case with distributed processing.
size_t ref_count = 0;
};
using MetadataByPath = std::unordered_map<std::string, Metadata>;
MetadataByPath metadata_by_path;
std::mutex mutex;
};
}

View File

@ -1,4 +1,4 @@
#include <Storages/S3Queue/S3QueueOrderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h>
#include <Common/SipHash.h>
#include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
@ -16,7 +16,7 @@ namespace ErrorCodes
namespace
{
S3QueueOrderedFileMetadata::Bucket getBucketForPathImpl(const std::string & path, size_t buckets_num)
ObjectStorageQueueOrderedFileMetadata::Bucket getBucketForPathImpl(const std::string & path, size_t buckets_num)
{
return sipHash64(path) % buckets_num;
}
@ -40,7 +40,7 @@ namespace
}
}
S3QueueOrderedFileMetadata::BucketHolder::BucketHolder(
ObjectStorageQueueOrderedFileMetadata::BucketHolder::BucketHolder(
const Bucket & bucket_,
int bucket_version_,
const std::string & bucket_lock_path_,
@ -57,7 +57,7 @@ S3QueueOrderedFileMetadata::BucketHolder::BucketHolder(
{
}
void S3QueueOrderedFileMetadata::BucketHolder::release()
void ObjectStorageQueueOrderedFileMetadata::BucketHolder::release()
{
if (released)
return;
@ -89,7 +89,7 @@ void S3QueueOrderedFileMetadata::BucketHolder::release()
zkutil::KeeperMultiException::check(code, requests, responses);
}
S3QueueOrderedFileMetadata::BucketHolder::~BucketHolder()
ObjectStorageQueueOrderedFileMetadata::BucketHolder::~BucketHolder()
{
if (!released)
LOG_TEST(log, "Releasing bucket ({}) holder in destructor", bucket_info->bucket);
@ -104,7 +104,7 @@ S3QueueOrderedFileMetadata::BucketHolder::~BucketHolder()
}
}
S3QueueOrderedFileMetadata::S3QueueOrderedFileMetadata(
ObjectStorageQueueOrderedFileMetadata::ObjectStorageQueueOrderedFileMetadata(
const std::filesystem::path & zk_path_,
const std::string & path_,
FileStatusPtr file_status_,
@ -112,7 +112,7 @@ S3QueueOrderedFileMetadata::S3QueueOrderedFileMetadata(
size_t buckets_num_,
size_t max_loading_retries_,
LoggerPtr log_)
: S3QueueIFileMetadata(
: ObjectStorageQueueIFileMetadata(
path_,
/* processing_node_path */zk_path_ / "processing" / getNodeName(path_),
/* processed_node_path */getProcessedPath(zk_path_, path_, buckets_num_),
@ -126,7 +126,7 @@ S3QueueOrderedFileMetadata::S3QueueOrderedFileMetadata(
{
}
std::vector<std::string> S3QueueOrderedFileMetadata::getMetadataPaths(size_t buckets_num)
std::vector<std::string> ObjectStorageQueueOrderedFileMetadata::getMetadataPaths(size_t buckets_num)
{
if (buckets_num > 1)
{
@ -139,7 +139,7 @@ std::vector<std::string> S3QueueOrderedFileMetadata::getMetadataPaths(size_t buc
return {"failed", "processing"};
}
bool S3QueueOrderedFileMetadata::getMaxProcessedFile(
bool ObjectStorageQueueOrderedFileMetadata::getMaxProcessedFile(
NodeMetadata & result,
Coordination::Stat * stat,
const zkutil::ZooKeeperPtr & zk_client)
@ -147,7 +147,7 @@ bool S3QueueOrderedFileMetadata::getMaxProcessedFile(
return getMaxProcessedFile(result, stat, processed_node_path, zk_client);
}
bool S3QueueOrderedFileMetadata::getMaxProcessedFile(
bool ObjectStorageQueueOrderedFileMetadata::getMaxProcessedFile(
NodeMetadata & result,
Coordination::Stat * stat,
const std::string & processed_node_path_,
@ -163,12 +163,12 @@ bool S3QueueOrderedFileMetadata::getMaxProcessedFile(
return false;
}
S3QueueOrderedFileMetadata::Bucket S3QueueOrderedFileMetadata::getBucketForPath(const std::string & path_, size_t buckets_num)
ObjectStorageQueueOrderedFileMetadata::Bucket ObjectStorageQueueOrderedFileMetadata::getBucketForPath(const std::string & path_, size_t buckets_num)
{
return getBucketForPathImpl(path_, buckets_num);
}
S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcquireBucket(
ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr ObjectStorageQueueOrderedFileMetadata::tryAcquireBucket(
const std::filesystem::path & zk_path,
const Bucket & bucket,
const Processor & processor,
@ -190,7 +190,7 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
bucket_lock_id_path, processor_info, zkutil::CreateMode::Persistent, /* ignore_if_exists */true));
/// Update bucket lock id path. We use its version as a version of ephemeral bucket lock node.
/// (See comment near S3QueueIFileMetadata::processing_node_version).
/// (See comment near ObjectStorageQueueIFileMetadata::processing_node_version).
requests.push_back(zkutil::makeSetRequest(bucket_lock_id_path, processor_info, -1));
Coordination::Responses responses;
@ -223,7 +223,7 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", code);
}
std::pair<bool, S3QueueIFileMetadata::FileStatus::State> S3QueueOrderedFileMetadata::setProcessingImpl()
std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorageQueueOrderedFileMetadata::setProcessingImpl()
{
/// In one zookeeper transaction do the following:
enum RequestType
@ -319,7 +319,7 @@ std::pair<bool, S3QueueIFileMetadata::FileStatus::State> S3QueueOrderedFileMetad
}
}
void S3QueueOrderedFileMetadata::setProcessedAtStartRequests(
void ObjectStorageQueueOrderedFileMetadata::setProcessedAtStartRequests(
Coordination::Requests & requests,
const zkutil::ZooKeeperPtr & zk_client)
{
@ -337,7 +337,7 @@ void S3QueueOrderedFileMetadata::setProcessedAtStartRequests(
}
}
void S3QueueOrderedFileMetadata::setProcessedRequests(
void ObjectStorageQueueOrderedFileMetadata::setProcessedRequests(
Coordination::Requests & requests,
const zkutil::ZooKeeperPtr & zk_client,
const std::string & processed_node_path_,
@ -378,7 +378,7 @@ void S3QueueOrderedFileMetadata::setProcessedRequests(
}
}
void S3QueueOrderedFileMetadata::setProcessedImpl()
void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
{
/// In one zookeeper transaction do the following:
enum RequestType

View File

@ -1,5 +1,5 @@
#pragma once
#include <Storages/S3Queue/S3QueueIFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h>
#include <Common/logger_useful.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <filesystem>
@ -7,7 +7,7 @@
namespace DB
{
class S3QueueOrderedFileMetadata : public S3QueueIFileMetadata
class ObjectStorageQueueOrderedFileMetadata : public ObjectStorageQueueIFileMetadata
{
public:
using Processor = std::string;
@ -21,7 +21,7 @@ public:
};
using BucketInfoPtr = std::shared_ptr<const BucketInfo>;
explicit S3QueueOrderedFileMetadata(
explicit ObjectStorageQueueOrderedFileMetadata(
const std::filesystem::path & zk_path_,
const std::string & path_,
FileStatusPtr file_status_,
@ -39,7 +39,7 @@ public:
const Processor & processor,
LoggerPtr log_);
static S3QueueOrderedFileMetadata::Bucket getBucketForPath(const std::string & path, size_t buckets_num);
static ObjectStorageQueueOrderedFileMetadata::Bucket getBucketForPath(const std::string & path, size_t buckets_num);
static std::vector<std::string> getMetadataPaths(size_t buckets_num);
@ -73,7 +73,7 @@ private:
bool ignore_if_exists);
};
struct S3QueueOrderedFileMetadata::BucketHolder : private boost::noncopyable
struct ObjectStorageQueueOrderedFileMetadata::BucketHolder : private boost::noncopyable
{
BucketHolder(
const Bucket & bucket_,

View File

@ -1,4 +1,4 @@
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Common/Exception.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
@ -13,14 +13,23 @@ namespace ErrorCodes
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(S3QueueSettingsTraits, LIST_OF_S3QUEUE_SETTINGS)
IMPLEMENT_SETTINGS_TRAITS(ObjectStorageQueueSettingsTraits, LIST_OF_OBJECT_STORAGE_QUEUE_SETTINGS)
void S3QueueSettings::loadFromQuery(ASTStorage & storage_def)
void ObjectStorageQueueSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
/// We support settings starting with s3_ for compatibility.
for (auto & change : storage_def.settings->changes)
{
if (change.name.starts_with("s3queue_"))
change.name = change.name.substr(std::strlen("s3queue_"));
if (change.name == "enable_logging_to_s3queue_log")
change.name = "enable_logging_to_queue_log";
}
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)

View File

@ -0,0 +1,51 @@
#pragma once
#include <Core/BaseSettings.h>
#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
namespace DB
{
class ASTStorage;
#define OBJECT_STORAGE_QUEUE_RELATED_SETTINGS(M, ALIAS) \
M(ObjectStorageQueueMode, \
mode, \
ObjectStorageQueueMode::ORDERED, \
"With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer." \
"With ordered mode, only the max name of the successfully consumed file stored.", \
0) \
M(ObjectStorageQueueAction, after_processing, ObjectStorageQueueAction::KEEP, "Delete or keep file in after successful processing", 0) \
M(String, keeper_path, "", "Zookeeper node path", 0) \
M(UInt32, loading_retries, 10, "Retry loading up to specified number of times", 0) \
M(UInt32, processing_threads_num, 1, "Number of processing threads", 0) \
M(UInt32, enable_logging_to_queue_log, 1, "Enable logging to system table system.(s3/azure_)queue_log", 0) \
M(String, last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
M(UInt32, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
M(UInt32, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
M(UInt32, polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
M(UInt32, polling_backoff_ms, 1000, "Polling backoff", 0) \
M(UInt32, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
M(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
M(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
M(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
M(UInt32, max_processed_files_before_commit, 100, "Number of files which can be processed before being committed to keeper", 0) \
M(UInt32, max_processed_rows_before_commit, 0, "Number of rows which can be processed before being committed to keeper", 0) \
M(UInt32, max_processed_bytes_before_commit, 0, "Number of bytes which can be processed before being committed to keeper", 0) \
M(UInt32, max_processing_time_sec_before_commit, 0, "Timeout in seconds after which to commit files committed to keeper", 0) \
#define LIST_OF_OBJECT_STORAGE_QUEUE_SETTINGS(M, ALIAS) \
OBJECT_STORAGE_QUEUE_RELATED_SETTINGS(M, ALIAS) \
LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(ObjectStorageQueueSettingsTraits, LIST_OF_OBJECT_STORAGE_QUEUE_SETTINGS)
struct ObjectStorageQueueSettings : public BaseSettings<ObjectStorageQueueSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -5,20 +5,14 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/logger_useful.h>
#include <Common/getRandomASCIIString.h>
#include <Storages/S3Queue/S3QueueSource.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
#include <Storages/VirtualColumnUtils.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
namespace CurrentMetrics
{
extern const Metric StorageS3Threads;
extern const Metric StorageS3ThreadsActive;
}
namespace ProfileEvents
{
extern const Event S3QueuePullMicroseconds;
extern const Event ObjectStorageQueuePullMicroseconds;
}
namespace DB
@ -26,25 +20,24 @@ namespace DB
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
StorageS3QueueSource::S3QueueObjectInfo::S3QueueObjectInfo(
ObjectStorageQueueSource::ObjectStorageQueueObjectInfo::ObjectStorageQueueObjectInfo(
const Source::ObjectInfo & object_info,
S3QueueMetadata::FileMetadataPtr file_metadata_)
ObjectStorageQueueMetadata::FileMetadataPtr file_metadata_)
: Source::ObjectInfo(object_info.relative_path, object_info.metadata)
, file_metadata(file_metadata_)
{
}
StorageS3QueueSource::FileIterator::FileIterator(
std::shared_ptr<S3QueueMetadata> metadata_,
ObjectStorageQueueSource::FileIterator::FileIterator(
std::shared_ptr<ObjectStorageQueueMetadata> metadata_,
std::unique_ptr<Source::GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_,
LoggerPtr logger_)
: StorageObjectStorageSource::IIterator("S3QueueIterator")
: StorageObjectStorageSource::IIterator("ObjectStorageQueueIterator")
, metadata(metadata_)
, glob_iterator(std::move(glob_iterator_))
, shutdown_called(shutdown_called_)
@ -52,7 +45,7 @@ StorageS3QueueSource::FileIterator::FileIterator(
{
}
bool StorageS3QueueSource::FileIterator::isFinished() const
bool ObjectStorageQueueSource::FileIterator::isFinished() const
{
LOG_TEST(log, "Iterator finished: {}, objects to retry: {}", iterator_finished, objects_to_retry.size());
return iterator_finished
@ -60,15 +53,15 @@ bool StorageS3QueueSource::FileIterator::isFinished() const
&& objects_to_retry.empty();
}
size_t StorageS3QueueSource::FileIterator::estimatedKeysCount()
size_t ObjectStorageQueueSource::FileIterator::estimatedKeysCount()
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method estimateKeysCount is not implemented");
}
StorageS3QueueSource::Source::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl(size_t processor)
ObjectStorageQueueSource::Source::ObjectInfoPtr ObjectStorageQueueSource::FileIterator::nextImpl(size_t processor)
{
Source::ObjectInfoPtr object_info;
S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info;
ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info;
while (!shutdown_called)
{
@ -107,12 +100,12 @@ StorageS3QueueSource::Source::ObjectInfoPtr StorageS3QueueSource::FileIterator::
auto file_metadata = metadata->getFileMetadata(object_info->relative_path, bucket_info);
if (file_metadata->setProcessing())
return std::make_shared<S3QueueObjectInfo>(*object_info, file_metadata);
return std::make_shared<ObjectStorageQueueObjectInfo>(*object_info, file_metadata);
}
return {};
}
void StorageS3QueueSource::FileIterator::returnForRetry(Source::ObjectInfoPtr object_info)
void ObjectStorageQueueSource::FileIterator::returnForRetry(Source::ObjectInfoPtr object_info)
{
chassert(object_info);
if (metadata->useBucketsForProcessing())
@ -126,7 +119,7 @@ void StorageS3QueueSource::FileIterator::returnForRetry(Source::ObjectInfoPtr ob
}
}
void StorageS3QueueSource::FileIterator::releaseFinishedBuckets()
void ObjectStorageQueueSource::FileIterator::releaseFinishedBuckets()
{
for (const auto & [processor, holders] : bucket_holders)
{
@ -157,8 +150,8 @@ void StorageS3QueueSource::FileIterator::releaseFinishedBuckets()
}
}
std::pair<StorageS3QueueSource::Source::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr>
StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processor)
std::pair<ObjectStorageQueueSource::Source::ObjectInfoPtr, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr>
ObjectStorageQueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processor)
{
auto bucket_holder_it = bucket_holders.emplace(processor, std::vector<BucketHolderPtr>{}).first;
BucketHolder * current_bucket_holder = bucket_holder_it->second.empty() || bucket_holder_it->second.back()->isFinished()
@ -234,7 +227,7 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
}
}
/// If processing thread has already acquired some bucket
/// and while listing s3 directory gets a key which is in a different bucket,
/// and while listing object storage directory gets a key which is in a different bucket,
/// it puts the key into listed_keys_cache to allow others to process it,
/// because one processing thread can acquire only one bucket at a time.
/// Once a thread is finished with its acquired bucket, it checks listed_keys_cache
@ -361,19 +354,19 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
}
}
StorageS3QueueSource::StorageS3QueueSource(
ObjectStorageQueueSource::ObjectStorageQueueSource(
String name_,
size_t processor_id_,
const Block & header_,
std::unique_ptr<StorageObjectStorageSource> internal_source_,
std::shared_ptr<S3QueueMetadata> files_metadata_,
const S3QueueAction & action_,
std::shared_ptr<ObjectStorageQueueMetadata> files_metadata_,
const ObjectStorageQueueAction & action_,
RemoveFileFunc remove_file_func_,
const NamesAndTypesList & requested_virtual_columns_,
ContextPtr context_,
const std::atomic<bool> & shutdown_called_,
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_,
std::shared_ptr<ObjectStorageQueueLog> system_queue_log_,
const StorageID & storage_id_,
LoggerPtr log_,
size_t max_processed_files_before_commit_,
@ -391,7 +384,7 @@ StorageS3QueueSource::StorageS3QueueSource(
, requested_virtual_columns(requested_virtual_columns_)
, shutdown_called(shutdown_called_)
, table_is_being_dropped(table_is_being_dropped_)
, s3_queue_log(s3_queue_log_)
, system_queue_log(system_queue_log_)
, storage_id(storage_id_)
, max_processed_files_before_commit(max_processed_files_before_commit_)
, max_processed_rows_before_commit(max_processed_rows_before_commit_)
@ -403,12 +396,12 @@ StorageS3QueueSource::StorageS3QueueSource(
{
}
String StorageS3QueueSource::getName() const
String ObjectStorageQueueSource::getName() const
{
return name;
}
void StorageS3QueueSource::lazyInitialize(size_t processor)
void ObjectStorageQueueSource::lazyInitialize(size_t processor)
{
if (initialized)
return;
@ -423,7 +416,7 @@ void StorageS3QueueSource::lazyInitialize(size_t processor)
initialized = true;
}
Chunk StorageS3QueueSource::generate()
Chunk ObjectStorageQueueSource::generate()
{
Chunk chunk;
try
@ -445,7 +438,7 @@ Chunk StorageS3QueueSource::generate()
return chunk;
}
Chunk StorageS3QueueSource::generateImpl()
Chunk ObjectStorageQueueSource::generateImpl()
{
lazyInitialize(processor_id);
@ -457,7 +450,7 @@ Chunk StorageS3QueueSource::generateImpl()
break;
}
const auto * object_info = dynamic_cast<const S3QueueObjectInfo *>(reader.getObjectInfo().get());
const auto * object_info = dynamic_cast<const ObjectStorageQueueObjectInfo *>(reader.getObjectInfo().get());
auto file_metadata = object_info->file_metadata;
auto file_status = file_metadata->getFileStatus();
@ -522,11 +515,11 @@ Chunk StorageS3QueueSource::generateImpl()
auto * prev_scope = CurrentThread::get().attachProfileCountersScope(&file_status->profile_counters);
SCOPE_EXIT({ CurrentThread::get().attachProfileCountersScope(prev_scope); });
/// FIXME: if files are compressed, profile counters update does not work fully (s3 related counters are not saved). Why?
/// FIXME: if files are compressed, profile counters update does not work fully (object storage related counters are not saved). Why?
try
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueuePullMicroseconds);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::ObjectStorageQueuePullMicroseconds);
Chunk chunk;
if (reader->pull(chunk))
@ -657,7 +650,7 @@ Chunk StorageS3QueueSource::generateImpl()
return {};
}
void StorageS3QueueSource::commit(bool success, const std::string & exception_message)
void ObjectStorageQueueSource::commit(bool success, const std::string & exception_message)
{
LOG_TEST(log, "Having {} files to set as {}, failed files: {}",
processed_files.size(), success ? "Processed" : "Failed", failed_during_read_files.size());
@ -687,33 +680,33 @@ void StorageS3QueueSource::commit(bool success, const std::string & exception_me
}
}
void StorageS3QueueSource::applyActionAfterProcessing(const String & path)
void ObjectStorageQueueSource::applyActionAfterProcessing(const String & path)
{
switch (action)
{
case S3QueueAction::DELETE:
case ObjectStorageQueueAction::DELETE:
{
assert(remove_file_func);
remove_file_func(path);
break;
}
case S3QueueAction::KEEP:
case ObjectStorageQueueAction::KEEP:
break;
}
}
void StorageS3QueueSource::appendLogElement(
void ObjectStorageQueueSource::appendLogElement(
const std::string & filename,
S3QueueMetadata::FileStatus & file_status_,
ObjectStorageQueueMetadata::FileStatus & file_status_,
size_t processed_rows,
bool processed)
{
if (!s3_queue_log)
if (!system_queue_log)
return;
S3QueueLogElement elem{};
ObjectStorageQueueLogElement elem{};
{
elem = S3QueueLogElement
elem = ObjectStorageQueueLogElement
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.database = storage_id.database_name,
@ -721,14 +714,14 @@ void StorageS3QueueSource::appendLogElement(
.uuid = toString(storage_id.uuid),
.file_name = filename,
.rows_processed = processed_rows,
.status = processed ? S3QueueLogElement::S3QueueStatus::Processed : S3QueueLogElement::S3QueueStatus::Failed,
.status = processed ? ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed : ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed,
.counters_snapshot = file_status_.profile_counters.getPartiallyAtomicSnapshot(),
.processing_start_time = file_status_.processing_start_time,
.processing_end_time = file_status_.processing_end_time,
.exception = file_status_.getException(),
};
}
s3_queue_log->add(std::move(elem));
system_queue_log->add(std::move(elem));
}
}

View File

@ -3,10 +3,10 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Processors/ISource.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/ObjectStorageQueueLog.h>
namespace Poco { class Logger; }
@ -16,29 +16,29 @@ namespace DB
struct ObjectMetadata;
class StorageS3QueueSource : public ISource, WithContext
class ObjectStorageQueueSource : public ISource, WithContext
{
public:
using Storage = StorageObjectStorage;
using Source = StorageObjectStorageSource;
using RemoveFileFunc = std::function<void(std::string)>;
using BucketHolderPtr = S3QueueOrderedFileMetadata::BucketHolderPtr;
using BucketHolder = S3QueueOrderedFileMetadata::BucketHolder;
using BucketHolderPtr = ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr;
using BucketHolder = ObjectStorageQueueOrderedFileMetadata::BucketHolder;
struct S3QueueObjectInfo : public Source::ObjectInfo
struct ObjectStorageQueueObjectInfo : public Source::ObjectInfo
{
S3QueueObjectInfo(
ObjectStorageQueueObjectInfo(
const Source::ObjectInfo & object_info,
S3QueueMetadata::FileMetadataPtr file_metadata_);
ObjectStorageQueueMetadata::FileMetadataPtr file_metadata_);
S3QueueMetadata::FileMetadataPtr file_metadata;
ObjectStorageQueueMetadata::FileMetadataPtr file_metadata;
};
class FileIterator : public StorageObjectStorageSource::IIterator
{
public:
FileIterator(
std::shared_ptr<S3QueueMetadata> metadata_,
std::shared_ptr<ObjectStorageQueueMetadata> metadata_,
std::unique_ptr<Source::GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_,
LoggerPtr logger_);
@ -63,10 +63,10 @@ public:
void releaseFinishedBuckets();
private:
using Bucket = S3QueueMetadata::Bucket;
using Processor = S3QueueMetadata::Processor;
using Bucket = ObjectStorageQueueMetadata::Bucket;
using Processor = ObjectStorageQueueMetadata::Processor;
const std::shared_ptr<S3QueueMetadata> metadata;
const std::shared_ptr<ObjectStorageQueueMetadata> metadata;
const std::unique_ptr<Source::GlobIterator> glob_iterator;
std::atomic<bool> & shutdown_called;
@ -90,23 +90,23 @@ public:
/// Only for processing without buckets.
std::deque<Source::ObjectInfoPtr> objects_to_retry;
std::pair<Source::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor);
std::pair<Source::ObjectInfoPtr, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor);
bool hasKeysForProcessor(const Processor & processor) const;
};
StorageS3QueueSource(
ObjectStorageQueueSource(
String name_,
size_t processor_id_,
const Block & header_,
std::unique_ptr<StorageObjectStorageSource> internal_source_,
std::shared_ptr<S3QueueMetadata> files_metadata_,
const S3QueueAction & action_,
std::shared_ptr<ObjectStorageQueueMetadata> files_metadata_,
const ObjectStorageQueueAction & action_,
RemoveFileFunc remove_file_func_,
const NamesAndTypesList & requested_virtual_columns_,
ContextPtr context_,
const std::atomic<bool> & shutdown_called_,
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_,
std::shared_ptr<ObjectStorageQueueLog> system_queue_log_,
const StorageID & storage_id_,
LoggerPtr log_,
size_t max_processed_files_before_commit_,
@ -128,13 +128,13 @@ public:
private:
const String name;
const size_t processor_id;
const S3QueueAction action;
const std::shared_ptr<S3QueueMetadata> files_metadata;
const ObjectStorageQueueAction action;
const std::shared_ptr<ObjectStorageQueueMetadata> files_metadata;
const std::shared_ptr<StorageObjectStorageSource> internal_source;
const NamesAndTypesList requested_virtual_columns;
const std::atomic<bool> & shutdown_called;
const std::atomic<bool> & table_is_being_dropped;
const std::shared_ptr<S3QueueLog> s3_queue_log;
const std::shared_ptr<ObjectStorageQueueLog> system_queue_log;
const StorageID storage_id;
const size_t max_processed_files_before_commit;
const size_t max_processed_rows_before_commit;
@ -145,8 +145,8 @@ private:
RemoveFileFunc remove_file_func;
LoggerPtr log;
std::vector<S3QueueMetadata::FileMetadataPtr> processed_files;
std::vector<S3QueueMetadata::FileMetadataPtr> failed_during_read_files;
std::vector<ObjectStorageQueueMetadata::FileMetadataPtr> processed_files;
std::vector<ObjectStorageQueueMetadata::FileMetadataPtr> failed_during_read_files;
Source::ReaderHolder reader;
std::future<Source::ReaderHolder> reader_future;
@ -160,7 +160,7 @@ private:
Chunk generateImpl();
void applyActionAfterProcessing(const String & path);
void appendLogElement(const std::string & filename, S3QueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
void appendLogElement(const std::string & filename, ObjectStorageQueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
void lazyInitialize(size_t processor);
};

View File

@ -3,9 +3,9 @@
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
@ -20,33 +20,33 @@ namespace ErrorCodes
namespace
{
S3QueueMode modeFromString(const std::string & mode)
ObjectStorageQueueMode modeFromString(const std::string & mode)
{
if (mode == "ordered")
return S3QueueMode::ORDERED;
return ObjectStorageQueueMode::ORDERED;
if (mode == "unordered")
return S3QueueMode::UNORDERED;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected S3Queue mode: {}", mode);
return ObjectStorageQueueMode::UNORDERED;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected ObjectStorageQueue mode: {}", mode);
}
}
S3QueueTableMetadata::S3QueueTableMetadata(
ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(
const StorageObjectStorage::Configuration & configuration,
const S3QueueSettings & engine_settings,
const ObjectStorageQueueSettings & engine_settings,
const StorageInMemoryMetadata & storage_metadata)
{
format_name = configuration.format;
after_processing = engine_settings.after_processing.toString();
mode = engine_settings.mode.toString();
tracked_files_limit = engine_settings.s3queue_tracked_files_limit;
tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec;
buckets = engine_settings.s3queue_buckets;
processing_threads_num = engine_settings.s3queue_processing_threads_num;
tracked_files_limit = engine_settings.tracked_files_limit;
tracked_file_ttl_sec = engine_settings.tracked_file_ttl_sec;
buckets = engine_settings.buckets;
processing_threads_num = engine_settings.processing_threads_num;
columns = storage_metadata.getColumns().toString();
}
String S3QueueTableMetadata::toString() const
String ObjectStorageQueueTableMetadata::toString() const
{
Poco::JSON::Object json;
json.set("after_processing", after_processing);
@ -65,7 +65,7 @@ String S3QueueTableMetadata::toString() const
return oss.str();
}
void S3QueueTableMetadata::read(const String & metadata_str)
void ObjectStorageQueueTableMetadata::read(const String & metadata_str)
{
Poco::JSON::Parser parser;
auto json = parser.parse(metadata_str).extract<Poco::JSON::Object::Ptr>();
@ -102,19 +102,19 @@ void S3QueueTableMetadata::read(const String & metadata_str)
buckets = json->getValue<UInt64>("buckets");
}
S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str)
ObjectStorageQueueTableMetadata ObjectStorageQueueTableMetadata::parse(const String & metadata_str)
{
S3QueueTableMetadata metadata;
ObjectStorageQueueTableMetadata metadata;
metadata.read(metadata_str);
return metadata;
}
void S3QueueTableMetadata::checkEquals(const S3QueueTableMetadata & from_zk) const
void ObjectStorageQueueTableMetadata::checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const
{
checkImmutableFieldsEquals(from_zk);
}
void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata & from_zk) const
void ObjectStorageQueueTableMetadata::checkImmutableFieldsEquals(const ObjectStorageQueueTableMetadata & from_zk) const
{
if (after_processing != from_zk.after_processing)
throw Exception(
@ -164,29 +164,29 @@ void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata
from_zk.last_processed_path,
last_processed_path);
if (modeFromString(mode) == S3QueueMode::ORDERED)
if (modeFromString(mode) == ObjectStorageQueueMode::ORDERED)
{
if (buckets != from_zk.buckets)
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in s3queue_buckets setting. "
"Existing table metadata in ZooKeeper differs in buckets setting. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.buckets, buckets);
}
if (S3QueueMetadata::getBucketsNum(*this) != S3QueueMetadata::getBucketsNum(from_zk))
if (ObjectStorageQueueMetadata::getBucketsNum(*this) != ObjectStorageQueueMetadata::getBucketsNum(from_zk))
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in processing buckets. "
"Stored in ZooKeeper: {}, local: {}",
S3QueueMetadata::getBucketsNum(*this), S3QueueMetadata::getBucketsNum(from_zk));
ObjectStorageQueueMetadata::getBucketsNum(*this), ObjectStorageQueueMetadata::getBucketsNum(from_zk));
}
}
}
void S3QueueTableMetadata::checkEquals(const S3QueueSettings & current, const S3QueueSettings & expected)
void ObjectStorageQueueTableMetadata::checkEquals(const ObjectStorageQueueSettings & current, const ObjectStorageQueueSettings & expected)
{
if (current.after_processing != expected.after_processing)
throw Exception(
@ -204,48 +204,48 @@ void S3QueueTableMetadata::checkEquals(const S3QueueSettings & current, const S3
expected.mode.toString(),
current.mode.toString());
if (current.s3queue_tracked_files_limit != expected.s3queue_tracked_files_limit)
if (current.tracked_files_limit != expected.tracked_files_limit)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set size. "
"Stored in ZooKeeper: {}, local: {}",
expected.s3queue_tracked_files_limit,
current.s3queue_tracked_files_limit);
expected.tracked_files_limit,
current.tracked_files_limit);
if (current.s3queue_tracked_file_ttl_sec != expected.s3queue_tracked_file_ttl_sec)
if (current.tracked_file_ttl_sec != expected.tracked_file_ttl_sec)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set age. "
"Stored in ZooKeeper: {}, local: {}",
expected.s3queue_tracked_file_ttl_sec,
current.s3queue_tracked_file_ttl_sec);
expected.tracked_file_ttl_sec,
current.tracked_file_ttl_sec);
if (current.s3queue_last_processed_path.value != expected.s3queue_last_processed_path.value)
if (current.last_processed_path.value != expected.last_processed_path.value)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in last_processed_path. "
"Stored in ZooKeeper: {}, local: {}",
expected.s3queue_last_processed_path.value,
current.s3queue_last_processed_path.value);
expected.last_processed_path.value,
current.last_processed_path.value);
if (current.mode == S3QueueMode::ORDERED)
if (current.mode == ObjectStorageQueueMode::ORDERED)
{
if (current.s3queue_buckets != expected.s3queue_buckets)
if (current.buckets != expected.buckets)
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in s3queue_buckets setting. "
"Existing table metadata in ZooKeeper differs in buckets setting. "
"Stored in ZooKeeper: {}, local: {}",
expected.s3queue_buckets, current.s3queue_buckets);
expected.buckets, current.buckets);
}
if (S3QueueMetadata::getBucketsNum(current) != S3QueueMetadata::getBucketsNum(expected))
if (ObjectStorageQueueMetadata::getBucketsNum(current) != ObjectStorageQueueMetadata::getBucketsNum(expected))
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in processing buckets. "
"Stored in ZooKeeper: {}, local: {}",
S3QueueMetadata::getBucketsNum(current), S3QueueMetadata::getBucketsNum(expected));
ObjectStorageQueueMetadata::getBucketsNum(current), ObjectStorageQueueMetadata::getBucketsNum(expected));
}
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <base/types.h>
@ -11,10 +11,10 @@ namespace DB
class WriteBuffer;
class ReadBuffer;
/** The basic parameters of S3Queue table engine for saving in ZooKeeper.
/** The basic parameters of ObjectStorageQueue table engine for saving in ZooKeeper.
* Lets you verify that they match local ones.
*/
struct S3QueueTableMetadata
struct ObjectStorageQueueTableMetadata
{
String format_name;
String columns;
@ -26,22 +26,22 @@ struct S3QueueTableMetadata
UInt64 processing_threads_num = 1;
String last_processed_path;
S3QueueTableMetadata() = default;
S3QueueTableMetadata(
ObjectStorageQueueTableMetadata() = default;
ObjectStorageQueueTableMetadata(
const StorageObjectStorage::Configuration & configuration,
const S3QueueSettings & engine_settings,
const ObjectStorageQueueSettings & engine_settings,
const StorageInMemoryMetadata & storage_metadata);
void read(const String & metadata_str);
static S3QueueTableMetadata parse(const String & metadata_str);
static ObjectStorageQueueTableMetadata parse(const String & metadata_str);
String toString() const;
void checkEquals(const S3QueueTableMetadata & from_zk) const;
static void checkEquals(const S3QueueSettings & current, const S3QueueSettings & expected);
void checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const;
static void checkEquals(const ObjectStorageQueueSettings & current, const ObjectStorageQueueSettings & expected);
private:
void checkImmutableFieldsEquals(const S3QueueTableMetadata & from_zk) const;
void checkImmutableFieldsEquals(const ObjectStorageQueueTableMetadata & from_zk) const;
};

View File

@ -1,4 +1,4 @@
#include <Storages/S3Queue/S3QueueUnorderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueUnorderedFileMetadata.h>
#include <Common/getRandomASCIIString.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Interpreters/Context.h>
@ -18,13 +18,13 @@ namespace
}
}
S3QueueUnorderedFileMetadata::S3QueueUnorderedFileMetadata(
ObjectStorageQueueUnorderedFileMetadata::ObjectStorageQueueUnorderedFileMetadata(
const std::filesystem::path & zk_path,
const std::string & path_,
FileStatusPtr file_status_,
size_t max_loading_retries_,
LoggerPtr log_)
: S3QueueIFileMetadata(
: ObjectStorageQueueIFileMetadata(
path_,
/* processing_node_path */zk_path / "processing" / getNodeName(path_),
/* processed_node_path */zk_path / "processed" / getNodeName(path_),
@ -35,7 +35,7 @@ S3QueueUnorderedFileMetadata::S3QueueUnorderedFileMetadata(
{
}
std::pair<bool, S3QueueIFileMetadata::FileStatus::State> S3QueueUnorderedFileMetadata::setProcessingImpl()
std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorageQueueUnorderedFileMetadata::setProcessingImpl()
{
/// In one zookeeper transaction do the following:
enum RequestType
@ -89,7 +89,7 @@ std::pair<bool, S3QueueIFileMetadata::FileStatus::State> S3QueueUnorderedFileMet
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", magic_enum::enum_name(code));
}
void S3QueueUnorderedFileMetadata::setProcessedAtStartRequests(
void ObjectStorageQueueUnorderedFileMetadata::setProcessedAtStartRequests(
Coordination::Requests & requests,
const zkutil::ZooKeeperPtr &)
{
@ -98,7 +98,7 @@ void S3QueueUnorderedFileMetadata::setProcessedAtStartRequests(
processed_node_path, node_metadata.toString(), zkutil::CreateMode::Persistent));
}
void S3QueueUnorderedFileMetadata::setProcessedImpl()
void ObjectStorageQueueUnorderedFileMetadata::setProcessedImpl()
{
/// In one zookeeper transaction do the following:
enum RequestType

View File

@ -1,17 +1,17 @@
#pragma once
#include <Storages/S3Queue/S3QueueIFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h>
#include <filesystem>
#include <Common/logger_useful.h>
namespace DB
{
class S3QueueUnorderedFileMetadata : public S3QueueIFileMetadata
class ObjectStorageQueueUnorderedFileMetadata : public ObjectStorageQueueIFileMetadata
{
public:
using Bucket = size_t;
explicit S3QueueUnorderedFileMetadata(
explicit ObjectStorageQueueUnorderedFileMetadata(
const std::filesystem::path & zk_path,
const std::string & path_,
FileStatusPtr file_status_,

View File

@ -1,10 +1,7 @@
#include <optional>
#include "config.h"
#include <Common/ProfileEvents.h>
#include <IO/S3Common.h>
#include <IO/CompressionMethod.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Parsers/ASTFunction.h>
@ -15,16 +12,15 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Processors/Sources/NullSource.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <Storages/S3Queue/StorageS3Queue.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
#include <Storages/StorageFactory.h>
#include <Formats/FormatFactory.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h>
#include <Storages/ObjectStorageQueue/StorageObjectStorageQueue.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageSnapshot.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/Utils.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -33,12 +29,6 @@
namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event S3DeleteObjects;
extern const Event S3ListObjects;
}
namespace DB
{
@ -46,23 +36,22 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int S3_ERROR;
extern const int QUERY_NOT_ALLOWED;
}
namespace
{
std::string chooseZooKeeperPath(const StorageID & table_id, const Settings & settings, const S3QueueSettings & s3queue_settings)
std::string chooseZooKeeperPath(const StorageID & table_id, const Settings & settings, const ObjectStorageQueueSettings & queue_settings)
{
std::string zk_path_prefix = settings.s3queue_default_zookeeper_path.value;
if (zk_path_prefix.empty())
zk_path_prefix = "/";
std::string result_zk_path;
if (s3queue_settings.keeper_path.changed)
if (queue_settings.keeper_path.changed)
{
/// We do not add table uuid here on purpose.
result_zk_path = fs::path(zk_path_prefix) / s3queue_settings.keeper_path.value;
result_zk_path = fs::path(zk_path_prefix) / queue_settings.keeper_path.value;
}
else
{
@ -73,49 +62,66 @@ namespace
}
void checkAndAdjustSettings(
S3QueueSettings & s3queue_settings,
const Settings & settings,
ObjectStorageQueueSettings & queue_settings,
ASTStorage * engine_args,
bool is_attach,
const LoggerPtr & log,
ASTStorage * engine_args)
const LoggerPtr & log)
{
if (!is_attach && !s3queue_settings.mode.changed)
if (!is_attach && !queue_settings.mode.changed)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `mode` (Unordered/Ordered) is not specified, but is required.");
}
/// In case !is_attach, we leave Ordered mode as default for compatibility.
if (!s3queue_settings.s3queue_enable_logging_to_s3queue_log.changed)
if (!queue_settings.processing_threads_num)
{
s3queue_settings.s3queue_enable_logging_to_s3queue_log = settings.s3queue_enable_logging_to_s3queue_log;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `processing_threads_num` cannot be set to zero");
}
if (s3queue_settings.s3queue_cleanup_interval_min_ms > s3queue_settings.s3queue_cleanup_interval_max_ms)
if (queue_settings.cleanup_interval_min_ms > queue_settings.cleanup_interval_max_ms)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Setting `s3queue_cleanup_interval_min_ms` ({}) must be less or equal to `s3queue_cleanup_interval_max_ms` ({})",
s3queue_settings.s3queue_cleanup_interval_min_ms, s3queue_settings.s3queue_cleanup_interval_max_ms);
"Setting `cleanup_interval_min_ms` ({}) must be less or equal to `cleanup_interval_max_ms` ({})",
queue_settings.cleanup_interval_min_ms, queue_settings.cleanup_interval_max_ms);
}
if (!s3queue_settings.s3queue_processing_threads_num)
if (!is_attach && !queue_settings.processing_threads_num.changed)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
}
if (!is_attach && !s3queue_settings.s3queue_processing_threads_num.changed)
{
s3queue_settings.s3queue_processing_threads_num = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
queue_settings.processing_threads_num = std::max<uint32_t>(getNumberOfPhysicalCPUCores(), 16);
engine_args->settings->as<ASTSetQuery>()->changes.insertSetting(
"s3queue_processing_threads_num",
s3queue_settings.s3queue_processing_threads_num.value);
"processing_threads_num",
queue_settings.processing_threads_num.value);
LOG_TRACE(log, "Set `processing_threads_num` to {}", s3queue_settings.s3queue_processing_threads_num);
LOG_TRACE(log, "Set `processing_threads_num` to {}", queue_settings.processing_threads_num);
}
}
std::shared_ptr<ObjectStorageQueueLog> getQueueLog(const ObjectStoragePtr & storage, const ContextPtr & context, const ObjectStorageQueueSettings & table_settings)
{
const auto & settings = context->getSettingsRef();
switch (storage->getType())
{
case DB::ObjectStorageType::S3:
{
if (table_settings.enable_logging_to_queue_log || settings.s3queue_enable_logging_to_s3queue_log)
return context->getS3QueueLog();
return nullptr;
}
case DB::ObjectStorageType::Azure:
{
if (table_settings.enable_logging_to_queue_log)
return context->getAzureQueueLog();
return nullptr;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected object storage type: {}", storage->getType());
}
}
}
StorageS3Queue::StorageS3Queue(
std::unique_ptr<S3QueueSettings> s3queue_settings_,
StorageObjectStorageQueue::StorageObjectStorageQueue(
std::unique_ptr<ObjectStorageQueueSettings> queue_settings_,
const ConfigurationPtr configuration_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
@ -127,12 +133,12 @@ StorageS3Queue::StorageS3Queue(
LoadingStrictnessLevel mode)
: IStorage(table_id_)
, WithContext(context_)
, s3queue_settings(std::move(s3queue_settings_))
, zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *s3queue_settings))
, queue_settings(std::move(queue_settings_))
, zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *queue_settings))
, configuration{configuration_}
, format_settings(format_settings_)
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
, log(getLogger("StorageS3Queue (" + table_id_.getFullTableName() + ")"))
, reschedule_processing_interval_ms(queue_settings->polling_min_timeout_ms)
, log(getLogger(fmt::format("Storage{}Queue ({})", configuration->getEngineName(), table_id_.getFullTableName())))
{
if (configuration->getPath().empty())
{
@ -144,10 +150,10 @@ StorageS3Queue::StorageS3Queue(
}
else if (!configuration->isPathWithGlobs())
{
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "ObjectStorageQueue url must either end with '/' or contain globs");
}
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE, log, engine_args);
checkAndAdjustSettings(*queue_settings, engine_args, mode > LoadingStrictnessLevel::CREATE, log);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format);
@ -165,30 +171,30 @@ StorageS3Queue::StorageS3Queue(
setInMemoryMetadata(storage_metadata);
LOG_INFO(log, "Using zookeeper path: {}", zk_path.string());
task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); });
/// Get metadata manager from S3QueueMetadataFactory,
/// Get metadata manager from ObjectStorageQueueMetadataFactory,
/// it will increase the ref count for the metadata object.
/// The ref count is decreased when StorageS3Queue::drop() method is called.
files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings);
/// The ref count is decreased when StorageObjectStorageQueue::drop() method is called.
files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, *queue_settings);
try
{
files_metadata->initialize(configuration_, storage_metadata);
}
catch (...)
{
S3QueueMetadataFactory::instance().remove(zk_path);
ObjectStorageQueueMetadataFactory::instance().remove(zk_path);
throw;
}
}
void StorageS3Queue::startup()
void StorageObjectStorageQueue::startup()
{
if (task)
task->activateAndSchedule();
}
void StorageS3Queue::shutdown(bool is_drop)
void StorageObjectStorageQueue::shutdown(bool is_drop)
{
table_is_being_dropped = is_drop;
shutdown_called = true;
@ -207,31 +213,31 @@ void StorageS3Queue::shutdown(bool is_drop)
LOG_TRACE(log, "Shut down storage");
}
void StorageS3Queue::drop()
void StorageObjectStorageQueue::drop()
{
S3QueueMetadataFactory::instance().remove(zk_path);
ObjectStorageQueueMetadataFactory::instance().remove(zk_path);
}
bool StorageS3Queue::supportsSubsetOfColumns(const ContextPtr & context_) const
bool StorageObjectStorageQueue::supportsSubsetOfColumns(const ContextPtr & context_) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context_, format_settings);
}
class ReadFromS3Queue : public SourceStepWithFilter
class ReadFromObjectStorageQueue : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromS3Queue"; }
std::string getName() const override { return "ReadFromObjectStorageQueue"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters(ActionDAGNodes added_filter_nodes) override;
ReadFromS3Queue(
ReadFromObjectStorageQueue(
const Names & column_names_,
const SelectQueryInfo & query_info_,
const StorageSnapshotPtr & storage_snapshot_,
const ContextPtr & context_,
Block sample_block,
ReadFromFormatInfo info_,
std::shared_ptr<StorageS3Queue> storage_,
std::shared_ptr<StorageObjectStorageQueue> storage_,
size_t max_block_size_)
: SourceStepWithFilter(
DataStream{.header = std::move(sample_block)},
@ -247,15 +253,15 @@ public:
private:
ReadFromFormatInfo info;
std::shared_ptr<StorageS3Queue> storage;
std::shared_ptr<StorageObjectStorageQueue> storage;
size_t max_block_size;
std::shared_ptr<StorageS3Queue::FileIterator> iterator;
std::shared_ptr<StorageObjectStorageQueue::FileIterator> iterator;
void createIterator(const ActionsDAG::Node * predicate);
};
void ReadFromS3Queue::createIterator(const ActionsDAG::Node * predicate)
void ReadFromObjectStorageQueue::createIterator(const ActionsDAG::Node * predicate)
{
if (iterator)
return;
@ -264,7 +270,7 @@ void ReadFromS3Queue::createIterator(const ActionsDAG::Node * predicate)
}
void ReadFromS3Queue::applyFilters(ActionDAGNodes added_filter_nodes)
void ReadFromObjectStorageQueue::applyFilters(ActionDAGNodes added_filter_nodes)
{
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
@ -275,7 +281,7 @@ void ReadFromS3Queue::applyFilters(ActionDAGNodes added_filter_nodes)
createIterator(predicate);
}
void StorageS3Queue::read(
void StorageObjectStorageQueue::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@ -297,10 +303,10 @@ void StorageS3Queue::read(
"Cannot read from {} with attached materialized views", getName());
}
auto this_ptr = std::static_pointer_cast<StorageS3Queue>(shared_from_this());
auto this_ptr = std::static_pointer_cast<StorageObjectStorageQueue>(shared_from_this());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context));
auto reading = std::make_unique<ReadFromS3Queue>(
auto reading = std::make_unique<ReadFromObjectStorageQueue>(
column_names,
query_info,
storage_snapshot,
@ -313,10 +319,10 @@ void StorageS3Queue::read(
query_plan.addStep(std::move(reading));
}
void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
void ReadFromObjectStorageQueue::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
Pipes pipes;
const size_t adjusted_num_streams = storage->s3queue_settings->s3queue_processing_threads_num;
const size_t adjusted_num_streams = storage->queue_settings->processing_threads_num;
createIterator(nullptr);
for (size_t i = 0; i < adjusted_num_streams; ++i)
@ -338,10 +344,10 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const
pipeline.init(std::move(pipe));
}
std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
std::shared_ptr<ObjectStorageQueueSource> StorageObjectStorageQueue::createSource(
size_t processor_id,
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
std::shared_ptr<StorageObjectStorageQueue::FileIterator> file_iterator,
size_t max_block_size,
ContextPtr local_context,
bool commit_once_processed)
@ -362,30 +368,30 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
{
object_storage->removeObject(StoredObject(path));
};
auto s3_queue_log = s3queue_settings->s3queue_enable_logging_to_s3queue_log ? local_context->getS3QueueLog() : nullptr;
return std::make_shared<StorageS3QueueSource>(
return std::make_shared<ObjectStorageQueueSource>(
getName(),
processor_id,
info.source_header,
std::move(internal_source),
files_metadata,
s3queue_settings->after_processing,
queue_settings->after_processing,
file_deleter,
info.requested_virtual_columns,
local_context,
shutdown_called,
table_is_being_dropped,
s3_queue_log,
getQueueLog(object_storage, local_context, *queue_settings),
getStorageID(),
log,
s3queue_settings->s3queue_max_processed_files_before_commit,
s3queue_settings->s3queue_max_processed_rows_before_commit,
s3queue_settings->s3queue_max_processed_bytes_before_commit,
s3queue_settings->s3queue_max_processing_time_sec_before_commit,
queue_settings->max_processed_files_before_commit,
queue_settings->max_processed_rows_before_commit,
queue_settings->max_processed_bytes_before_commit,
queue_settings->max_processing_time_sec_before_commit,
commit_once_processed);
}
bool StorageS3Queue::hasDependencies(const StorageID & table_id)
bool StorageObjectStorageQueue::hasDependencies(const StorageID & table_id)
{
// Check if all dependencies are attached
auto view_ids = DatabaseCatalog::instance().getDependentViews(table_id);
@ -410,7 +416,7 @@ bool StorageS3Queue::hasDependencies(const StorageID & table_id)
return true;
}
void StorageS3Queue::threadFunc()
void StorageObjectStorageQueue::threadFunc()
{
if (shutdown_called)
return;
@ -428,12 +434,12 @@ void StorageS3Queue::threadFunc()
if (streamToViews())
{
/// Reset the reschedule interval.
reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms;
reschedule_processing_interval_ms = queue_settings->polling_min_timeout_ms;
}
else
{
/// Increase the reschedule interval.
reschedule_processing_interval_ms += s3queue_settings->s3queue_polling_backoff_ms;
reschedule_processing_interval_ms += queue_settings->polling_backoff_ms;
}
LOG_DEBUG(log, "Stopped streaming to {} attached views", dependencies_count);
@ -450,12 +456,12 @@ void StorageS3Queue::threadFunc()
if (!shutdown_called)
{
LOG_TRACE(log, "Reschedule S3 Queue processing thread in {} ms", reschedule_processing_interval_ms);
LOG_TRACE(log, "Reschedule processing thread in {} ms", reschedule_processing_interval_ms);
task->scheduleAfter(reschedule_processing_interval_ms);
}
}
bool StorageS3Queue::streamToViews()
bool StorageObjectStorageQueue::streamToViews()
{
// Create a stream for each consumer and join them in a union stream
// Only insert into dependent views and expect that input blocks contain virtual columns
@ -469,35 +475,35 @@ bool StorageS3Queue::streamToViews()
insert->table_id = table_id;
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext());
auto s3queue_context = Context::createCopy(getContext());
s3queue_context->makeQueryContext();
auto queue_context = Context::createCopy(getContext());
queue_context->makeQueryContext();
auto file_iterator = createFileIterator(s3queue_context, nullptr);
auto file_iterator = createFileIterator(queue_context, nullptr);
size_t total_rows = 0;
while (!shutdown_called && !file_iterator->isFinished())
{
InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true);
InterpreterInsertQuery interpreter(insert, queue_context, false, true, true);
auto block_io = interpreter.execute();
auto read_from_format_info = prepareReadingFromFormat(
block_io.pipeline.getHeader().getNames(),
storage_snapshot,
supportsSubsetOfColumns(s3queue_context));
supportsSubsetOfColumns(queue_context));
Pipes pipes;
std::vector<std::shared_ptr<StorageS3QueueSource>> sources;
std::vector<std::shared_ptr<ObjectStorageQueueSource>> sources;
pipes.reserve(s3queue_settings->s3queue_processing_threads_num);
sources.reserve(s3queue_settings->s3queue_processing_threads_num);
pipes.reserve(queue_settings->processing_threads_num);
sources.reserve(queue_settings->processing_threads_num);
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
for (size_t i = 0; i < queue_settings->processing_threads_num; ++i)
{
auto source = createSource(
i/* processor_id */,
read_from_format_info,
file_iterator,
DBMS_DEFAULT_BUFFER_SIZE,
s3queue_context,
queue_context,
false/* commit_once_processed */);
pipes.emplace_back(source);
@ -506,8 +512,8 @@ bool StorageS3Queue::streamToViews()
auto pipe = Pipe::unitePipes(std::move(pipes));
block_io.pipeline.complete(std::move(pipe));
block_io.pipeline.setNumThreads(s3queue_settings->s3queue_processing_threads_num);
block_io.pipeline.setConcurrencyControl(s3queue_context->getSettingsRef().use_concurrency_control);
block_io.pipeline.setNumThreads(queue_settings->processing_threads_num);
block_io.pipeline.setConcurrencyControl(queue_context->getSettingsRef().use_concurrency_control);
std::atomic_size_t rows = 0;
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
@ -536,12 +542,12 @@ bool StorageS3Queue::streamToViews()
return total_rows > 0;
}
zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const
zkutil::ZooKeeperPtr StorageObjectStorageQueue::getZooKeeper() const
{
return getContext()->getZooKeeper();
}
std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate)
std::shared_ptr<StorageObjectStorageQueue::FileIterator> StorageObjectStorageQueue::createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate)
{
auto settings = configuration->getQuerySettings(local_context);
auto glob_iterator = std::make_unique<StorageObjectStorageSource::GlobIterator>(
@ -550,73 +556,4 @@ std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator
return std::make_shared<FileIterator>(files_metadata, std::move(glob_iterator), shutdown_called, log);
}
#if USE_AWS_S3
void registerStorageS3Queue(StorageFactory & factory)
{
factory.registerStorage(
"S3Queue",
[](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getContext(), false);
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
std::optional<FormatSettings> format_settings;
auto s3queue_settings = std::make_unique<S3QueueSettings>();
if (args.storage_def->settings)
{
s3queue_settings->loadFromQuery(*args.storage_def);
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
user_format_settings.set(change.name, change.value);
else
LOG_TRACE(getLogger("StorageS3"), "Remove: {}", change.name);
args.storage_def->settings->changes.removeSetting(change.name);
}
for (const auto & change : args.storage_def->settings->changes)
{
if (user_format_settings.has(change.name))
user_format_settings.applyChange(change);
}
format_settings = getFormatSettings(args.getContext(), user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
return std::make_shared<StorageS3Queue>(
std::move(s3queue_settings),
std::move(configuration),
args.table_id,
args.columns,
args.constraints,
args.comment,
args.getContext(),
format_settings,
args.storage_def,
args.mode);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
#endif
}

View File

@ -5,25 +5,24 @@
#include <Common/logger_useful.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueSource.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Interpreters/Context.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Storages/StorageFactory.h>
namespace DB
{
class S3QueueMetadata;
class ObjectStorageQueueMetadata;
class StorageS3Queue : public IStorage, WithContext
class StorageObjectStorageQueue : public IStorage, WithContext
{
public:
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
StorageS3Queue(
std::unique_ptr<S3QueueSettings> s3queue_settings_,
StorageObjectStorageQueue(
std::unique_ptr<ObjectStorageQueueSettings> queue_settings_,
ConfigurationPtr configuration_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
@ -34,7 +33,7 @@ public:
ASTStorage * engine_args,
LoadingStrictnessLevel mode);
String getName() const override { return "S3Queue"; }
String getName() const override { return "ObjectStorageQueue"; }
void read(
QueryPlan & query_plan,
@ -53,13 +52,13 @@ public:
zkutil::ZooKeeperPtr getZooKeeper() const;
private:
friend class ReadFromS3Queue;
using FileIterator = StorageS3QueueSource::FileIterator;
friend class ReadFromObjectStorageQueue;
using FileIterator = ObjectStorageQueueSource::FileIterator;
const std::unique_ptr<S3QueueSettings> s3queue_settings;
const std::unique_ptr<ObjectStorageQueueSettings> queue_settings;
const fs::path zk_path;
std::shared_ptr<S3QueueMetadata> files_metadata;
std::shared_ptr<ObjectStorageQueueMetadata> files_metadata;
ConfigurationPtr configuration;
ObjectStoragePtr object_storage;
@ -83,10 +82,10 @@ private:
bool supportsDynamicSubcolumns() const override { return true; }
std::shared_ptr<FileIterator> createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate);
std::shared_ptr<StorageS3QueueSource> createSource(
std::shared_ptr<ObjectStorageQueueSource> createSource(
size_t processor_id,
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
std::shared_ptr<StorageObjectStorageQueue::FileIterator> file_iterator,
size_t max_block_size,
ContextPtr local_context,
bool commit_once_processed);

View File

@ -0,0 +1,115 @@
#include "config.h"
#include <Storages/StorageFactory.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/StorageObjectStorageQueue.h>
#include <Formats/FormatFactory.h>
#if USE_AWS_S3
#include <IO/S3Common.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#endif
#if USE_AZURE_BLOB_STORAGE
#include <Storages/ObjectStorage/Azure/Configuration.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
template <typename Configuration>
StoragePtr createQueueStorage(const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = std::make_shared<Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getContext(), false);
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
std::optional<FormatSettings> format_settings;
auto queue_settings = std::make_unique<ObjectStorageQueueSettings>();
if (args.storage_def->settings)
{
queue_settings->loadFromQuery(*args.storage_def);
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
user_format_settings.set(change.name, change.value);
args.storage_def->settings->changes.removeSetting(change.name);
}
for (const auto & change : args.storage_def->settings->changes)
{
if (user_format_settings.has(change.name))
user_format_settings.applyChange(change);
}
format_settings = getFormatSettings(args.getContext(), user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
return std::make_shared<StorageObjectStorageQueue>(
std::move(queue_settings),
std::move(configuration),
args.table_id,
args.columns,
args.constraints,
args.comment,
args.getContext(),
format_settings,
args.storage_def,
args.mode);
}
#if USE_AWS_S3
void registerStorageS3Queue(StorageFactory & factory)
{
factory.registerStorage(
"S3Queue",
[](const StorageFactory::Arguments & args)
{
return createQueueStorage<StorageS3Configuration>(args);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
#endif
#if USE_AZURE_BLOB_STORAGE
void registerStorageAzureQueue(StorageFactory & factory)
{
factory.registerStorage(
"AzureQueue",
[](const StorageFactory::Arguments & args)
{
return createQueueStorage<StorageAzureConfiguration>(args);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::AZURE,
});
}
#endif
}

View File

@ -1,37 +0,0 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
namespace DB
{
class S3QueueMetadataFactory final : private boost::noncopyable
{
public:
using FilesMetadataPtr = std::shared_ptr<S3QueueMetadata>;
static S3QueueMetadataFactory & instance();
FilesMetadataPtr getOrCreate(const std::string & zookeeper_path, const S3QueueSettings & settings);
void remove(const std::string & zookeeper_path);
std::unordered_map<std::string, FilesMetadataPtr> getAll();
private:
struct Metadata
{
explicit Metadata(std::shared_ptr<S3QueueMetadata> metadata_) : metadata(metadata_), ref_count(1) {}
std::shared_ptr<S3QueueMetadata> metadata;
/// TODO: the ref count should be kept in keeper, because of the case with distributed processing.
size_t ref_count = 0;
};
using MetadataByPath = std::unordered_map<std::string, Metadata>;
MetadataByPath metadata_by_path;
std::mutex mutex;
};
}

View File

@ -1,51 +0,0 @@
#pragma once
#include <Core/BaseSettings.h>
#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
namespace DB
{
class ASTStorage;
#define S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
M(S3QueueMode, \
mode, \
S3QueueMode::ORDERED, \
"With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer." \
"With ordered mode, only the max name of the successfully consumed file stored.", \
0) \
M(S3QueueAction, after_processing, S3QueueAction::KEEP, "Delete or keep file in S3 after successful processing", 0) \
M(String, keeper_path, "", "Zookeeper node path", 0) \
M(UInt32, s3queue_loading_retries, 10, "Retry loading up to specified number of times", 0) \
M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \
M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(String, s3queue_last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
M(UInt32, s3queue_polling_backoff_ms, 1000, "Polling backoff", 0) \
M(UInt32, s3queue_tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
M(UInt32, s3queue_cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
M(UInt32, s3queue_cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
M(UInt32, s3queue_buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
M(UInt32, s3queue_max_processed_files_before_commit, 100, "Number of files which can be processed before being committed to keeper", 0) \
M(UInt32, s3queue_max_processed_rows_before_commit, 0, "Number of rows which can be processed before being committed to keeper", 0) \
M(UInt32, s3queue_max_processed_bytes_before_commit, 0, "Number of bytes which can be processed before being committed to keeper", 0) \
M(UInt32, s3queue_max_processing_time_sec_before_commit, 0, "Timeout in seconds after which to commit files committed to keeper", 0) \
#define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \
S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(S3QueueSettingsTraits, LIST_OF_S3QUEUE_SETTINGS)
struct S3QueueSettings : public BaseSettings<S3QueueSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -11,9 +11,9 @@
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
#include <Storages/S3Queue/StorageS3Queue.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h>
#include <Storages/ObjectStorageQueue/StorageObjectStorageQueue.h>
#include <Disks/IDisk.h>
@ -44,7 +44,7 @@ StorageSystemS3Queue::StorageSystemS3Queue(const StorageID & table_id_)
void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
{
for (const auto & [zookeeper_path, metadata] : S3QueueMetadataFactory::instance().getAll())
for (const auto & [zookeeper_path, metadata] : ObjectStorageQueueMetadataFactory::instance().getAll())
{
for (const auto & [file_path, file_status] : metadata->getFileStatuses())
{

View File

@ -34,6 +34,7 @@ void registerStorageFuzzJSON(StorageFactory & factory);
void registerStorageS3(StorageFactory & factory);
void registerStorageHudi(StorageFactory & factory);
void registerStorageS3Queue(StorageFactory & factory);
void registerStorageAzureQueue(StorageFactory & factory);
#if USE_PARQUET
void registerStorageDeltaLake(StorageFactory & factory);
@ -126,6 +127,10 @@ void registerStorages()
registerStorageFuzzJSON(factory);
#endif
#if USE_AZURE_BLOB_STORAGE
registerStorageAzureQueue(factory);
#endif
#if USE_AWS_S3
registerStorageHudi(factory);
registerStorageS3Queue(factory);

View File

@ -12,6 +12,7 @@ import json
AVAILABLE_MODES = ["unordered", "ordered"]
DEFAULT_AUTH = ["'minio'", "'minio123'"]
NO_AUTH = ["NOSIGN"]
AZURE_CONTAINER_NAME = "cont"
def prepare_public_s3_bucket(started_cluster):
@ -84,6 +85,7 @@ def started_cluster():
"instance",
user_configs=["configs/users.xml"],
with_minio=True,
with_azurite=True,
with_zookeeper=True,
main_configs=[
"configs/zookeeper.xml",
@ -126,6 +128,11 @@ def started_cluster():
cluster.start()
logging.info("Cluster started")
container_client = cluster.blob_service_client.get_container_client(
AZURE_CONTAINER_NAME
)
container_client.create_container()
yield cluster
finally:
cluster.shutdown()
@ -145,6 +152,7 @@ def generate_random_files(
started_cluster,
files_path,
count,
storage="s3",
column_num=3,
row_num=10,
start_ind=0,
@ -166,7 +174,10 @@ def generate_random_files(
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
put_s3_file_content(started_cluster, filename, values_csv, bucket)
if storage == "s3":
put_s3_file_content(started_cluster, filename, values_csv, bucket)
else:
put_azure_file_content(started_cluster, filename, values_csv, bucket)
return total_values
@ -176,12 +187,21 @@ def put_s3_file_content(started_cluster, filename, data, bucket=None):
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
def put_azure_file_content(started_cluster, filename, data, bucket=None):
client = started_cluster.blob_service_client.get_blob_client(
AZURE_CONTAINER_NAME, filename
)
buf = io.BytesIO(data)
client.upload_blob(buf, "BlockBlob", len(data))
def create_table(
started_cluster,
node,
table_name,
mode,
files_path,
engine_name="S3Queue",
format="column1 UInt32, column2 UInt32, column3 UInt32",
additional_settings={},
file_format="CSV",
@ -200,11 +220,17 @@ def create_table(
}
settings.update(additional_settings)
url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/"
engine_def = None
if engine_name == "S3Queue":
url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/"
engine_def = f"{engine_name}('{url}', {auth_params}, {file_format})"
else:
engine_def = f"{engine_name}('{started_cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', '{files_path}/', 'CSV')"
node.query(f"DROP TABLE IF EXISTS {table_name}")
create_query = f"""
CREATE TABLE {table_name} ({format})
ENGINE = S3Queue('{url}', {auth_params}, {file_format})
ENGINE = {engine_def}
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
@ -235,17 +261,22 @@ def create_mv(
)
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_delete_after_processing(started_cluster, mode):
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
@pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"])
def test_delete_after_processing(started_cluster, mode, engine_name):
node = started_cluster.instances["instance"]
table_name = f"test.delete_after_processing_{mode}"
table_name = f"test.delete_after_processing_{mode}_{engine_name}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
files_num = 5
row_num = 10
if engine_name == "S3Queue":
storage = "s3"
else:
storage = "azure"
total_values = generate_random_files(
started_cluster, files_path, files_num, row_num=row_num
started_cluster, files_path, files_num, row_num=row_num, storage=storage
)
create_table(
started_cluster,
@ -254,6 +285,7 @@ def test_delete_after_processing(started_cluster, mode):
mode,
files_path,
additional_settings={"after_processing": "delete"},
engine_name=engine_name,
)
create_mv(node, table_name, dst_table_name)
@ -274,15 +306,24 @@ def test_delete_after_processing(started_cluster, mode):
).splitlines()
] == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0
if engine_name == "S3Queue":
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0
else:
client = started_cluster.blob_service_client.get_container_client(
AZURE_CONTAINER_NAME
)
objects_iterator = client.list_blobs(files_path)
for objects in objects_iterator:
assert False
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_failed_retry(started_cluster, mode):
@pytest.mark.parametrize("mode", ["unordered", "ordered"])
@pytest.mark.parametrize("engine_name", ["S3Queue", "AzureQueue"])
def test_failed_retry(started_cluster, mode, engine_name):
node = started_cluster.instances["instance"]
table_name = f"test.failed_retry_{mode}"
table_name = f"test.failed_retry_{mode}_{engine_name}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
file_path = f"{files_path}/trash_test.csv"
@ -295,7 +336,10 @@ def test_failed_retry(started_cluster, mode):
values_csv = (
"\n".join((",".join(map(str, row)) for row in values)) + "\n"
).encode()
put_s3_file_content(started_cluster, file_path, values_csv)
if engine_name == "S3Queue":
put_s3_file_content(started_cluster, file_path, values_csv)
else:
put_azure_file_content(started_cluster, file_path, values_csv)
create_table(
started_cluster,
@ -307,6 +351,7 @@ def test_failed_retry(started_cluster, mode):
"s3queue_loading_retries": retries_num,
"keeper_path": keeper_path,
},
engine_name=engine_name,
)
create_mv(node, table_name, dst_table_name)
@ -792,12 +837,12 @@ def test_max_set_age(started_cluster):
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_tracked_file_ttl_sec": max_age,
"s3queue_cleanup_interval_min_ms": max_age / 3,
"s3queue_cleanup_interval_max_ms": max_age / 3,
"s3queue_loading_retries": 0,
"s3queue_processing_threads_num": 1,
"s3queue_loading_retries": 0,
"tracked_file_ttl_sec": max_age,
"cleanup_interval_min_ms": max_age / 3,
"cleanup_interval_max_ms": max_age / 3,
"loading_retries": 0,
"processing_threads_num": 1,
"loading_retries": 0,
},
)
create_mv(node, table_name, dst_table_name)
@ -846,7 +891,7 @@ def test_max_set_age(started_cluster):
failed_count = int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
"SELECT value FROM system.events WHERE name = 'ObjectStorageQueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
)
@ -861,7 +906,7 @@ def test_max_set_age(started_cluster):
for _ in range(30):
if failed_count + 1 == int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
"SELECT value FROM system.events WHERE name = 'ObjectStorageQueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
):
break
@ -869,7 +914,7 @@ def test_max_set_age(started_cluster):
assert failed_count + 1 == int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
"SELECT value FROM system.events WHERE name = 'ObjectStorageQueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
)
@ -891,7 +936,9 @@ def test_max_set_age(started_cluster):
time.sleep(max_age + 1)
assert failed_count + 2 == int(
node.query("SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles'")
node.query(
"SELECT value FROM system.events WHERE name = 'ObjectStorageQueueFailedFiles'"
)
)
node.query("SYSTEM FLUSH LOGS")
@ -1435,7 +1482,7 @@ def test_settings_check(started_cluster):
)
assert (
"Existing table metadata in ZooKeeper differs in s3queue_buckets setting. Stored in ZooKeeper: 2, local: 3"
"Existing table metadata in ZooKeeper differs in buckets setting. Stored in ZooKeeper: 2, local: 3"
in create_table(
started_cluster,
node_2,
@ -1737,7 +1784,7 @@ def test_commit_on_limit(started_cluster):
assert 1 == int(
node.query(
"SELECT value FROM system.events WHERE name = 'S3QueueFailedFiles' SETTINGS system_events_show_zero_values=1"
"SELECT value FROM system.events WHERE name = 'ObjectStorageQueueFailedFiles' SETTINGS system_events_show_zero_values=1"
)
)