A few code renames in preparation for AzureQueue

This commit is contained in:
kssenii 2024-06-18 19:06:06 +02:00
parent 22f6095484
commit 78ccd03dd6
34 changed files with 547 additions and 544 deletions

View File

@ -222,7 +222,7 @@ add_object_library(clickhouse_storages_mergetree Storages/MergeTree)
add_object_library(clickhouse_storages_statistics Storages/Statistics)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_storages_windowview Storages/WindowView)
add_object_library(clickhouse_storages_s3queue Storages/S3Queue)
add_object_library(clickhouse_storages_s3queue Storages/ObjectStorageQueue)
add_object_library(clickhouse_storages_materializedview Storages/MaterializedView)
add_object_library(clickhouse_client Client)
add_object_library(clickhouse_bridge BridgeHelper)

View File

@ -27,7 +27,7 @@
M(ZooKeeperLogElement) \
M(ProcessorProfileLogElement) \
M(TextLogElement) \
M(S3QueueLogElement) \
M(ObjectStorageQueueLogElement) \
M(FilesystemCacheLogElement) \
M(FilesystemReadPrefetchesLogElement) \
M(AsynchronousInsertLogElement) \

View File

@ -201,13 +201,13 @@ IMPLEMENT_SETTING_ENUM(ORCCompression, ErrorCodes::BAD_ARGUMENTS,
{"zlib", FormatSettings::ORCCompression::ZLIB},
{"lz4", FormatSettings::ORCCompression::LZ4}})
IMPLEMENT_SETTING_ENUM(S3QueueMode, ErrorCodes::BAD_ARGUMENTS,
{{"ordered", S3QueueMode::ORDERED},
{"unordered", S3QueueMode::UNORDERED}})
IMPLEMENT_SETTING_ENUM(ObjectStorageQueueMode, ErrorCodes::BAD_ARGUMENTS,
{{"ordered", ObjectStorageQueueMode::ORDERED},
{"unordered", ObjectStorageQueueMode::UNORDERED}})
IMPLEMENT_SETTING_ENUM(S3QueueAction, ErrorCodes::BAD_ARGUMENTS,
{{"keep", S3QueueAction::KEEP},
{"delete", S3QueueAction::DELETE}})
IMPLEMENT_SETTING_ENUM(ObjectStorageQueueAction, ErrorCodes::BAD_ARGUMENTS,
{{"keep", ObjectStorageQueueAction::KEEP},
{"delete", ObjectStorageQueueAction::DELETE}})
IMPLEMENT_SETTING_ENUM(ExternalCommandStderrReaction, ErrorCodes::BAD_ARGUMENTS,
{{"none", ExternalCommandStderrReaction::NONE},

View File

@ -341,21 +341,21 @@ DECLARE_SETTING_ENUM(ParallelReplicasCustomKeyFilterType)
DECLARE_SETTING_ENUM(LocalFSReadMethod)
enum class S3QueueMode : uint8_t
enum class ObjectStorageQueueMode : uint8_t
{
ORDERED,
UNORDERED,
};
DECLARE_SETTING_ENUM(S3QueueMode)
DECLARE_SETTING_ENUM(ObjectStorageQueueMode)
enum class S3QueueAction : uint8_t
enum class ObjectStorageQueueAction : uint8_t
{
KEEP,
DELETE,
};
DECLARE_SETTING_ENUM(S3QueueAction)
DECLARE_SETTING_ENUM(ObjectStorageQueueAction)
DECLARE_SETTING_ENUM(ExternalCommandStderrReaction)

View File

@ -4118,7 +4118,7 @@ std::shared_ptr<FilesystemCacheLog> Context::getFilesystemCacheLog() const
return shared->system_logs->filesystem_cache_log;
}
std::shared_ptr<S3QueueLog> Context::getS3QueueLog() const
std::shared_ptr<ObjectStorageQueueLog> Context::getS3QueueLog() const
{
SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)

View File

@ -106,7 +106,7 @@ class TransactionsInfoLog;
class ProcessorsProfileLog;
class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class S3QueueLog;
class ObjectStorageQueueLog;
class AsynchronousInsertLog;
class BackupLog;
class BlobStorageLog;
@ -1106,7 +1106,7 @@ public:
std::shared_ptr<TransactionsInfoLog> getTransactionsInfoLog() const;
std::shared_ptr<ProcessorsProfileLog> getProcessorsProfileLog() const;
std::shared_ptr<FilesystemCacheLog> getFilesystemCacheLog() const;
std::shared_ptr<S3QueueLog> getS3QueueLog() const;
std::shared_ptr<ObjectStorageQueueLog> getS3QueueLog() const;
std::shared_ptr<FilesystemReadPrefetchesLog> getFilesystemReadPrefetchesLog() const;
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
std::shared_ptr<BackupLog> getBackupLog() const;

View File

@ -14,13 +14,13 @@
namespace DB
{
ColumnsDescription S3QueueLogElement::getColumnsDescription()
ColumnsDescription ObjectStorageQueueLogElement::getColumnsDescription()
{
auto status_datatype = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"Processed", static_cast<Int8>(S3QueueLogElement::S3QueueStatus::Processed)},
{"Failed", static_cast<Int8>(S3QueueLogElement::S3QueueStatus::Failed)},
{"Processed", static_cast<Int8>(ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed)},
{"Failed", static_cast<Int8>(ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed)},
});
return ColumnsDescription
@ -41,7 +41,7 @@ ColumnsDescription S3QueueLogElement::getColumnsDescription()
};
}
void S3QueueLogElement::appendToBlock(MutableColumns & columns) const
void ObjectStorageQueueLogElement::appendToBlock(MutableColumns & columns) const
{
size_t i = 0;
columns[i++]->insert(getFQDNOrHostName());

View File

@ -9,7 +9,7 @@
namespace DB
{
struct S3QueueLogElement
struct ObjectStorageQueueLogElement
{
time_t event_time{};
@ -20,18 +20,18 @@ struct S3QueueLogElement
std::string file_name;
size_t rows_processed = 0;
enum class S3QueueStatus : uint8_t
enum class ObjectStorageQueueStatus : uint8_t
{
Processed,
Failed,
};
S3QueueStatus status;
ObjectStorageQueueStatus status;
ProfileEvents::Counters::Snapshot counters_snapshot;
time_t processing_start_time;
time_t processing_end_time;
std::string exception;
static std::string name() { return "S3QueueLog"; }
static std::string name() { return "ObjectStorageQueueLog"; }
static ColumnsDescription getColumnsDescription();
static NamesAndAliases getNamesAndAliases() { return {}; }
@ -39,9 +39,9 @@ struct S3QueueLogElement
void appendToBlock(MutableColumns & columns) const;
};
class S3QueueLog : public SystemLog<S3QueueLogElement>
class ObjectStorageQueueLog : public SystemLog<ObjectStorageQueueLogElement>
{
using SystemLog<S3QueueLogElement>::SystemLog;
using SystemLog<ObjectStorageQueueLogElement>::SystemLog;
};
}

View File

@ -303,7 +303,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
processors_profile_log = createSystemLog<ProcessorsProfileLog>(global_context, "system", "processors_profile_log", config, "processors_profile_log", "Contains profiling information on processors level (building blocks for a pipeline for query execution.");
asynchronous_insert_log = createSystemLog<AsynchronousInsertLog>(global_context, "system", "asynchronous_insert_log", config, "asynchronous_insert_log", "Contains a history for all asynchronous inserts executed on current server.");
backup_log = createSystemLog<BackupLog>(global_context, "system", "backup_log", config, "backup_log", "Contains logging entries with the information about BACKUP and RESTORE operations.");
s3_queue_log = createSystemLog<S3QueueLog>(global_context, "system", "s3queue_log", config, "s3queue_log", "Contains logging entries with the information files processes by S3Queue engine.");
s3_queue_log = createSystemLog<ObjectStorageQueueLog>(global_context, "system", "s3queue_log", config, "s3queue_log", "Contains logging entries with the information files processes by S3Queue engine.");
blob_storage_log = createSystemLog<BlobStorageLog>(global_context, "system", "blob_storage_log", config, "blob_storage_log", "Contains logging entries with information about various blob storage operations such as uploads and deletes.");
if (query_log)

View File

@ -52,7 +52,7 @@ class FilesystemCacheLog;
class FilesystemReadPrefetchesLog;
class AsynchronousInsertLog;
class BackupLog;
class S3QueueLog;
class ObjectStorageQueueLog;
class BlobStorageLog;
/// System logs should be destroyed in destructor of the last Context and before tables,
@ -74,7 +74,7 @@ struct SystemLogs
std::shared_ptr<MetricLog> metric_log; /// Used to log all metrics.
std::shared_ptr<FilesystemCacheLog> filesystem_cache_log;
std::shared_ptr<FilesystemReadPrefetchesLog> filesystem_read_prefetches_log;
std::shared_ptr<S3QueueLog> s3_queue_log;
std::shared_ptr<ObjectStorageQueueLog> s3_queue_log;
/// Metrics from system.asynchronous_metrics.
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans.

View File

@ -15,7 +15,7 @@ class SchemaCache;
class StorageObjectStorageSource : public SourceWithKeyCondition, WithContext
{
friend class StorageS3QueueSource;
friend class ObjectStorageQueueSource;
public:
using Configuration = StorageObjectStorage::Configuration;
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;

View File

@ -1,4 +1,4 @@
#include <Storages/S3Queue/S3QueueIFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h>
#include <Common/SipHash.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
@ -35,19 +35,19 @@ namespace
}
}
void S3QueueIFileMetadata::FileStatus::onProcessing()
void ObjectStorageQueueIFileMetadata::FileStatus::onProcessing()
{
state = FileStatus::State::Processing;
processing_start_time = now();
}
void S3QueueIFileMetadata::FileStatus::onProcessed()
void ObjectStorageQueueIFileMetadata::FileStatus::onProcessed()
{
state = FileStatus::State::Processed;
processing_end_time = now();
}
void S3QueueIFileMetadata::FileStatus::onFailed(const std::string & exception)
void ObjectStorageQueueIFileMetadata::FileStatus::onFailed(const std::string & exception)
{
state = FileStatus::State::Failed;
processing_end_time = now();
@ -55,13 +55,13 @@ void S3QueueIFileMetadata::FileStatus::onFailed(const std::string & exception)
last_exception = exception;
}
std::string S3QueueIFileMetadata::FileStatus::getException() const
std::string ObjectStorageQueueIFileMetadata::FileStatus::getException() const
{
std::lock_guard lock(last_exception_mutex);
return last_exception;
}
std::string S3QueueIFileMetadata::NodeMetadata::toString() const
std::string ObjectStorageQueueIFileMetadata::NodeMetadata::toString() const
{
Poco::JSON::Object json;
json.set("file_path", file_path);
@ -76,7 +76,7 @@ std::string S3QueueIFileMetadata::NodeMetadata::toString() const
return oss.str();
}
S3QueueIFileMetadata::NodeMetadata S3QueueIFileMetadata::NodeMetadata::fromString(const std::string & metadata_str)
ObjectStorageQueueIFileMetadata::NodeMetadata ObjectStorageQueueIFileMetadata::NodeMetadata::fromString(const std::string & metadata_str)
{
Poco::JSON::Parser parser;
auto json = parser.parse(metadata_str).extract<Poco::JSON::Object::Ptr>();
@ -91,7 +91,7 @@ S3QueueIFileMetadata::NodeMetadata S3QueueIFileMetadata::NodeMetadata::fromStrin
return metadata;
}
S3QueueIFileMetadata::S3QueueIFileMetadata(
ObjectStorageQueueIFileMetadata::ObjectStorageQueueIFileMetadata(
const std::string & path_,
const std::string & processing_node_path_,
const std::string & processed_node_path_,
@ -116,7 +116,7 @@ S3QueueIFileMetadata::S3QueueIFileMetadata(
processed_node_path, processing_node_path, failed_node_path);
}
S3QueueIFileMetadata::~S3QueueIFileMetadata()
ObjectStorageQueueIFileMetadata::~ObjectStorageQueueIFileMetadata()
{
if (processing_id_version.has_value())
{
@ -148,9 +148,9 @@ S3QueueIFileMetadata::~S3QueueIFileMetadata()
}
}
std::string S3QueueIFileMetadata::getNodeName(const std::string & path)
std::string ObjectStorageQueueIFileMetadata::getNodeName(const std::string & path)
{
/// Since with are dealing with paths in s3 which can have "/",
/// Since with are dealing with paths in object storage which can have "/",
/// we cannot create a zookeeper node with the name equal to path.
/// Therefore we use a hash of the path as a node name.
@ -159,7 +159,7 @@ std::string S3QueueIFileMetadata::getNodeName(const std::string & path)
return toString(path_hash.get64());
}
S3QueueIFileMetadata::NodeMetadata S3QueueIFileMetadata::createNodeMetadata(
ObjectStorageQueueIFileMetadata::NodeMetadata ObjectStorageQueueIFileMetadata::createNodeMetadata(
const std::string & path,
const std::string & exception,
size_t retries)
@ -168,9 +168,9 @@ S3QueueIFileMetadata::NodeMetadata S3QueueIFileMetadata::createNodeMetadata(
/// Since node name is just a hash we want to know to which file it corresponds,
/// so we keep "file_path" in nodes data.
/// "last_processed_timestamp" is needed for TTL metadata nodes enabled by s3queue_tracked_file_ttl_sec.
/// "last_processed_timestamp" is needed for TTL metadata nodes enabled by tracked_file_ttl_sec.
/// "last_exception" is kept for introspection, should also be visible in system.s3queue_log if it is enabled.
/// "retries" is kept for retrying the processing enabled by s3queue_loading_retries.
/// "retries" is kept for retrying the processing enabled by loading_retries.
NodeMetadata metadata;
metadata.file_path = path;
metadata.last_processed_timestamp = now();
@ -179,7 +179,7 @@ S3QueueIFileMetadata::NodeMetadata S3QueueIFileMetadata::createNodeMetadata(
return metadata;
}
std::string S3QueueIFileMetadata::getProcessorInfo(const std::string & processor_id)
std::string ObjectStorageQueueIFileMetadata::getProcessorInfo(const std::string & processor_id)
{
/// Add information which will be useful for debugging just in case.
Poco::JSON::Object json;
@ -192,7 +192,7 @@ std::string S3QueueIFileMetadata::getProcessorInfo(const std::string & processor
return oss.str();
}
bool S3QueueIFileMetadata::setProcessing()
bool ObjectStorageQueueIFileMetadata::setProcessing()
{
auto state = file_status->state.load();
if (state == FileStatus::State::Processing
@ -221,7 +221,7 @@ bool S3QueueIFileMetadata::setProcessing()
return success;
}
void S3QueueIFileMetadata::setProcessed()
void ObjectStorageQueueIFileMetadata::setProcessed()
{
LOG_TRACE(log, "Setting file {} as processed (path: {})", path, processed_node_path);
@ -235,7 +235,7 @@ void S3QueueIFileMetadata::setProcessed()
LOG_TRACE(log, "Set file {} as processed (rows: {})", path, file_status->processed_rows);
}
void S3QueueIFileMetadata::setFailed(const std::string & exception)
void ObjectStorageQueueIFileMetadata::setFailed(const std::string & exception)
{
LOG_TRACE(log, "Setting file {} as failed (exception: {}, path: {})", path, exception, failed_node_path);
@ -254,7 +254,7 @@ void S3QueueIFileMetadata::setFailed(const std::string & exception)
LOG_TRACE(log, "Set file {} as failed (rows: {})", path, file_status->processed_rows);
}
void S3QueueIFileMetadata::setFailedNonRetriable()
void ObjectStorageQueueIFileMetadata::setFailedNonRetriable()
{
auto zk_client = getZooKeeper();
Coordination::Requests requests;
@ -285,7 +285,7 @@ void S3QueueIFileMetadata::setFailedNonRetriable()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error while setting file as failed: {}", code);
}
void S3QueueIFileMetadata::setFailedRetriable()
void ObjectStorageQueueIFileMetadata::setFailedRetriable()
{
/// Instead of creating a persistent /failed/node_hash node
/// we create a persistent /failed/node_hash.retriable node.

View File

@ -6,7 +6,7 @@
namespace DB
{
class S3QueueIFileMetadata
class ObjectStorageQueueIFileMetadata
{
public:
struct FileStatus
@ -41,7 +41,7 @@ public:
};
using FileStatusPtr = std::shared_ptr<FileStatus>;
explicit S3QueueIFileMetadata(
explicit ObjectStorageQueueIFileMetadata(
const std::string & path_,
const std::string & processing_node_path_,
const std::string & processed_node_path_,
@ -50,7 +50,7 @@ public:
size_t max_loading_retries_,
LoggerPtr log_);
virtual ~S3QueueIFileMetadata();
virtual ~ObjectStorageQueueIFileMetadata();
bool setProcessing();
void setProcessed();
@ -92,7 +92,7 @@ protected:
LoggerPtr log;
/// processing node is ephemeral, so we cannot verify with it if
/// this node was created by a certain processor on a previous s3 queue processing stage,
/// this node was created by a certain processor on a previous processing stage,
/// because we could get a session expired in between the stages
/// and someone else could just create this processing node.
/// Therefore we also create a persistent processing node

View File

@ -4,13 +4,12 @@
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Interpreters/Context.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueIFileMetadata.h>
#include <Storages/S3Queue/S3QueueOrderedFileMetadata.h>
#include <Storages/S3Queue/S3QueueUnorderedFileMetadata.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <IO/S3Settings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueUnorderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h>
#include <Storages/StorageSnapshot.h>
#include <base/sleep.h>
#include <Common/CurrentThread.h>
@ -63,7 +62,7 @@ namespace
}
}
class S3QueueMetadata::LocalFileStatuses
class ObjectStorageQueueMetadata::LocalFileStatuses
{
public:
LocalFileStatuses() = default;
@ -114,90 +113,81 @@ private:
}
};
S3QueueMetadata::S3QueueMetadata(const fs::path & zookeeper_path_, const S3QueueSettings & settings_)
ObjectStorageQueueMetadata::ObjectStorageQueueMetadata(const fs::path & zookeeper_path_, const ObjectStorageQueueSettings & settings_)
: settings(settings_)
, zookeeper_path(zookeeper_path_)
, buckets_num(getBucketsNum(settings_))
, log(getLogger("StorageS3Queue(" + zookeeper_path_.string() + ")"))
, log(getLogger("StorageObjectStorageQueue(" + zookeeper_path_.string() + ")"))
, local_file_statuses(std::make_shared<LocalFileStatuses>())
{
if (settings.mode == S3QueueMode::UNORDERED
&& (settings.s3queue_tracked_files_limit || settings.s3queue_tracked_file_ttl_sec))
if (settings.mode == ObjectStorageQueueMode::UNORDERED
&& (settings.tracked_files_limit || settings.tracked_file_ttl_sec))
{
task = Context::getGlobalContextInstance()->getSchedulePool().createTask(
"S3QueueCleanupFunc",
"ObjectStorageQueueCleanupFunc",
[this] { cleanupThreadFunc(); });
task->activate();
task->scheduleAfter(
generateRescheduleInterval(
settings.s3queue_cleanup_interval_min_ms, settings.s3queue_cleanup_interval_max_ms));
settings.cleanup_interval_min_ms, settings.cleanup_interval_max_ms));
}
}
S3QueueMetadata::~S3QueueMetadata()
ObjectStorageQueueMetadata::~ObjectStorageQueueMetadata()
{
shutdown();
}
void S3QueueMetadata::shutdown()
void ObjectStorageQueueMetadata::shutdown()
{
shutdown_called = true;
if (task)
task->deactivate();
}
void S3QueueMetadata::checkSettings(const S3QueueSettings & settings_) const
void ObjectStorageQueueMetadata::checkSettings(const ObjectStorageQueueSettings & settings_) const
{
S3QueueTableMetadata::checkEquals(settings, settings_);
ObjectStorageQueueTableMetadata::checkEquals(settings, settings_);
}
S3QueueMetadata::FileStatusPtr S3QueueMetadata::getFileStatus(const std::string & path)
ObjectStorageQueueMetadata::FileStatusPtr ObjectStorageQueueMetadata::getFileStatus(const std::string & path)
{
return local_file_statuses->get(path, /* create */false);
}
S3QueueMetadata::FileStatuses S3QueueMetadata::getFileStatuses() const
ObjectStorageQueueMetadata::FileStatuses ObjectStorageQueueMetadata::getFileStatuses() const
{
return local_file_statuses->getAll();
}
S3QueueMetadata::FileMetadataPtr S3QueueMetadata::getFileMetadata(
ObjectStorageQueueMetadata::FileMetadataPtr ObjectStorageQueueMetadata::getFileMetadata(
const std::string & path,
S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info)
ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info)
{
auto file_status = local_file_statuses->get(path, /* create */true);
switch (settings.mode.value)
{
case S3QueueMode::ORDERED:
return std::make_shared<S3QueueOrderedFileMetadata>(
case ObjectStorageQueueMode::ORDERED:
return std::make_shared<ObjectStorageQueueOrderedFileMetadata>(
zookeeper_path,
path,
file_status,
bucket_info,
buckets_num,
settings.s3queue_loading_retries,
settings.loading_retries,
log);
case S3QueueMode::UNORDERED:
return std::make_shared<S3QueueUnorderedFileMetadata>(
case ObjectStorageQueueMode::UNORDERED:
return std::make_shared<ObjectStorageQueueUnorderedFileMetadata>(
zookeeper_path,
path,
file_status,
settings.s3queue_loading_retries,
settings.loading_retries,
log);
}
}
size_t S3QueueMetadata::getBucketsNum(const S3QueueSettings & settings)
{
if (settings.s3queue_buckets)
return settings.s3queue_buckets;
if (settings.s3queue_processing_threads_num)
return settings.s3queue_processing_threads_num;
return 0;
}
size_t S3QueueMetadata::getBucketsNum(const S3QueueTableMetadata & settings)
size_t ObjectStorageQueueMetadata::getBucketsNum(const ObjectStorageQueueSettings & settings)
{
if (settings.buckets)
return settings.buckets;
@ -206,32 +196,41 @@ size_t S3QueueMetadata::getBucketsNum(const S3QueueTableMetadata & settings)
return 0;
}
bool S3QueueMetadata::useBucketsForProcessing() const
size_t ObjectStorageQueueMetadata::getBucketsNum(const ObjectStorageQueueTableMetadata & settings)
{
return settings.mode == S3QueueMode::ORDERED && (buckets_num > 1);
if (settings.buckets)
return settings.buckets;
if (settings.processing_threads_num)
return settings.processing_threads_num;
return 0;
}
S3QueueMetadata::Bucket S3QueueMetadata::getBucketForPath(const std::string & path) const
bool ObjectStorageQueueMetadata::useBucketsForProcessing() const
{
return S3QueueOrderedFileMetadata::getBucketForPath(path, buckets_num);
return settings.mode == ObjectStorageQueueMode::ORDERED && (buckets_num > 1);
}
S3QueueOrderedFileMetadata::BucketHolderPtr
S3QueueMetadata::tryAcquireBucket(const Bucket & bucket, const Processor & processor)
ObjectStorageQueueMetadata::Bucket ObjectStorageQueueMetadata::getBucketForPath(const std::string & path) const
{
return S3QueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor);
return ObjectStorageQueueOrderedFileMetadata::getBucketForPath(path, buckets_num);
}
void S3QueueMetadata::initialize(
ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr
ObjectStorageQueueMetadata::tryAcquireBucket(const Bucket & bucket, const Processor & processor)
{
return ObjectStorageQueueOrderedFileMetadata::tryAcquireBucket(zookeeper_path, bucket, processor);
}
void ObjectStorageQueueMetadata::initialize(
const ConfigurationPtr & configuration,
const StorageInMemoryMetadata & storage_metadata)
{
const auto metadata_from_table = S3QueueTableMetadata(*configuration, settings, storage_metadata);
const auto metadata_from_table = ObjectStorageQueueTableMetadata(*configuration, settings, storage_metadata);
const auto & columns_from_table = storage_metadata.getColumns();
const auto table_metadata_path = zookeeper_path / "metadata";
const auto metadata_paths = settings.mode == S3QueueMode::ORDERED
? S3QueueOrderedFileMetadata::getMetadataPaths(buckets_num)
: S3QueueUnorderedFileMetadata::getMetadataPaths();
const auto metadata_paths = settings.mode == ObjectStorageQueueMode::ORDERED
? ObjectStorageQueueOrderedFileMetadata::getMetadataPaths(buckets_num)
: ObjectStorageQueueUnorderedFileMetadata::getMetadataPaths();
auto zookeeper = getZooKeeper();
zookeeper->createAncestors(zookeeper_path);
@ -240,7 +239,7 @@ void S3QueueMetadata::initialize(
{
if (zookeeper->exists(table_metadata_path))
{
const auto metadata_from_zk = S3QueueTableMetadata::parse(zookeeper->get(fs::path(zookeeper_path) / "metadata"));
const auto metadata_from_zk = ObjectStorageQueueTableMetadata::parse(zookeeper->get(fs::path(zookeeper_path) / "metadata"));
const auto columns_from_zk = ColumnsDescription::parse(metadata_from_zk.columns);
metadata_from_table.checkEquals(metadata_from_zk);
@ -265,8 +264,8 @@ void S3QueueMetadata::initialize(
requests.emplace_back(zkutil::makeCreateRequest(zk_path, "", zkutil::CreateMode::Persistent));
}
if (!settings.s3queue_last_processed_path.value.empty())
getFileMetadata(settings.s3queue_last_processed_path)->setProcessedAtStartRequests(requests, zookeeper);
if (!settings.last_processed_path.value.empty())
getFileMetadata(settings.last_processed_path)->setProcessedAtStartRequests(requests, zookeeper);
Coordination::Responses responses;
auto code = zookeeper->tryMulti(requests, responses);
@ -290,10 +289,10 @@ void S3QueueMetadata::initialize(
"of wrong zookeeper path or because of logical error");
}
void S3QueueMetadata::cleanupThreadFunc()
void ObjectStorageQueueMetadata::cleanupThreadFunc()
{
/// A background task is responsible for maintaining
/// settings.s3queue_tracked_files_limit and max_set_age settings for `unordered` processing mode.
/// settings.tracked_files_limit and max_set_age settings for `unordered` processing mode.
if (shutdown_called)
return;
@ -312,10 +311,10 @@ void S3QueueMetadata::cleanupThreadFunc()
task->scheduleAfter(
generateRescheduleInterval(
settings.s3queue_cleanup_interval_min_ms, settings.s3queue_cleanup_interval_max_ms));
settings.cleanup_interval_min_ms, settings.cleanup_interval_max_ms));
}
void S3QueueMetadata::cleanupThreadFuncImpl()
void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueCleanupMaxSetSizeOrTTLMicroseconds);
const auto zk_client = getZooKeeper();
@ -355,11 +354,11 @@ void S3QueueMetadata::cleanupThreadFuncImpl()
return;
}
chassert(settings.s3queue_tracked_files_limit || settings.s3queue_tracked_file_ttl_sec);
const bool check_nodes_limit = settings.s3queue_tracked_files_limit > 0;
const bool check_nodes_ttl = settings.s3queue_tracked_file_ttl_sec > 0;
chassert(settings.tracked_files_limit || settings.tracked_file_ttl_sec);
const bool check_nodes_limit = settings.tracked_files_limit > 0;
const bool check_nodes_ttl = settings.tracked_file_ttl_sec > 0;
const bool nodes_limit_exceeded = nodes_num > settings.s3queue_tracked_files_limit;
const bool nodes_limit_exceeded = nodes_num > settings.tracked_files_limit;
if ((!nodes_limit_exceeded || !check_nodes_limit) && !check_nodes_ttl)
{
LOG_TEST(log, "No limit exceeded");
@ -381,7 +380,7 @@ void S3QueueMetadata::cleanupThreadFuncImpl()
struct Node
{
std::string zk_path;
S3QueueIFileMetadata::NodeMetadata metadata;
ObjectStorageQueueIFileMetadata::NodeMetadata metadata;
};
auto node_cmp = [](const Node & a, const Node & b)
{
@ -402,7 +401,7 @@ void S3QueueMetadata::cleanupThreadFuncImpl()
std::string metadata_str;
if (zk_client->tryGet(path, metadata_str))
{
sorted_nodes.emplace(path, S3QueueIFileMetadata::NodeMetadata::fromString(metadata_str));
sorted_nodes.emplace(path, ObjectStorageQueueIFileMetadata::NodeMetadata::fromString(metadata_str));
LOG_TEST(log, "Fetched metadata for node {}", path);
}
else
@ -432,9 +431,9 @@ void S3QueueMetadata::cleanupThreadFuncImpl()
wb << fmt::format("Node: {}, path: {}, timestamp: {};\n", node, metadata.file_path, metadata.last_processed_timestamp);
return wb.str();
};
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", settings.s3queue_tracked_files_limit, settings.s3queue_tracked_file_ttl_sec, get_nodes_str());
LOG_TEST(log, "Checking node limits (max size: {}, max age: {}) for {}", settings.tracked_files_limit, settings.tracked_file_ttl_sec, get_nodes_str());
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - settings.s3queue_tracked_files_limit : 0;
size_t nodes_to_remove = check_nodes_limit && nodes_limit_exceeded ? nodes_num - settings.tracked_files_limit : 0;
for (const auto & node : sorted_nodes)
{
if (nodes_to_remove)
@ -453,7 +452,7 @@ void S3QueueMetadata::cleanupThreadFuncImpl()
else if (check_nodes_ttl)
{
UInt64 node_age = getCurrentTime() - node.metadata.last_processed_timestamp;
if (node_age >= settings.s3queue_tracked_file_ttl_sec)
if (node_age >= settings.tracked_file_ttl_sec)
{
LOG_TRACE(log, "Removing node at path {} ({}) because file ttl is reached",
node.metadata.file_path, node.zk_path);

View File

@ -7,23 +7,23 @@
#include <Core/BackgroundSchedulePool.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/S3Queue/S3QueueIFileMetadata.h>
#include <Storages/S3Queue/S3QueueOrderedFileMetadata.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
namespace fs = std::filesystem;
namespace Poco { class Logger; }
namespace DB
{
struct S3QueueSettings;
class StorageS3Queue;
struct S3QueueTableMetadata;
struct ObjectStorageQueueSettings;
class StorageObjectStorageQueue;
struct ObjectStorageQueueTableMetadata;
struct StorageInMemoryMetadata;
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
/**
* A class for managing S3Queue metadata in zookeeper, e.g.
* A class for managing ObjectStorageQueue metadata in zookeeper, e.g.
* the following folders:
* - <path_to_metadata>/processed
* - <path_to_metadata>/processing
@ -35,7 +35,7 @@ using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
* - <path_to_metadata>/processing
* - <path_to_metadata>/failed
*
* Depending on S3Queue processing mode (ordered or unordered)
* Depending on ObjectStorageQueue processing mode (ordered or unordered)
* we can differently store metadata in /processed node.
*
* Implements caching of zookeeper metadata for faster responses.
@ -44,24 +44,24 @@ using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
* In case of Unordered mode - if files TTL is enabled or maximum tracked files limit is set
* starts a background cleanup thread which is responsible for maintaining them.
*/
class S3QueueMetadata
class ObjectStorageQueueMetadata
{
public:
using FileStatus = S3QueueIFileMetadata::FileStatus;
using FileMetadataPtr = std::shared_ptr<S3QueueIFileMetadata>;
using FileStatus = ObjectStorageQueueIFileMetadata::FileStatus;
using FileMetadataPtr = std::shared_ptr<ObjectStorageQueueIFileMetadata>;
using FileStatusPtr = std::shared_ptr<FileStatus>;
using FileStatuses = std::unordered_map<std::string, FileStatusPtr>;
using Bucket = size_t;
using Processor = std::string;
S3QueueMetadata(const fs::path & zookeeper_path_, const S3QueueSettings & settings_);
~S3QueueMetadata();
ObjectStorageQueueMetadata(const fs::path & zookeeper_path_, const ObjectStorageQueueSettings & settings_);
~ObjectStorageQueueMetadata();
void initialize(const ConfigurationPtr & configuration, const StorageInMemoryMetadata & storage_metadata);
void checkSettings(const S3QueueSettings & settings) const;
void checkSettings(const ObjectStorageQueueSettings & settings) const;
void shutdown();
FileMetadataPtr getFileMetadata(const std::string & path, S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info = {});
FileMetadataPtr getFileMetadata(const std::string & path, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info = {});
FileStatusPtr getFileStatus(const std::string & path);
FileStatuses getFileStatuses() const;
@ -69,16 +69,16 @@ public:
/// Method of Ordered mode parallel processing.
bool useBucketsForProcessing() const;
Bucket getBucketForPath(const std::string & path) const;
S3QueueOrderedFileMetadata::BucketHolderPtr tryAcquireBucket(const Bucket & bucket, const Processor & processor);
ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr tryAcquireBucket(const Bucket & bucket, const Processor & processor);
static size_t getBucketsNum(const S3QueueSettings & settings);
static size_t getBucketsNum(const S3QueueTableMetadata & settings);
static size_t getBucketsNum(const ObjectStorageQueueSettings & settings);
static size_t getBucketsNum(const ObjectStorageQueueTableMetadata & settings);
private:
void cleanupThreadFunc();
void cleanupThreadFuncImpl();
const S3QueueSettings settings;
const ObjectStorageQueueSettings settings;
const fs::path zookeeper_path;
const size_t buckets_num;

View File

@ -1,4 +1,4 @@
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h>
#include <Interpreters/Context.h>
namespace DB
@ -8,20 +8,20 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
S3QueueMetadataFactory & S3QueueMetadataFactory::instance()
ObjectStorageQueueMetadataFactory & ObjectStorageQueueMetadataFactory::instance()
{
static S3QueueMetadataFactory ret;
static ObjectStorageQueueMetadataFactory ret;
return ret;
}
S3QueueMetadataFactory::FilesMetadataPtr
S3QueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const S3QueueSettings & settings)
ObjectStorageQueueMetadataFactory::FilesMetadataPtr
ObjectStorageQueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const ObjectStorageQueueSettings & settings)
{
std::lock_guard lock(mutex);
auto it = metadata_by_path.find(zookeeper_path);
if (it == metadata_by_path.end())
{
auto files_metadata = std::make_shared<S3QueueMetadata>(zookeeper_path, settings);
auto files_metadata = std::make_shared<ObjectStorageQueueMetadata>(zookeeper_path, settings);
it = metadata_by_path.emplace(zookeeper_path, std::move(files_metadata)).first;
}
else
@ -32,7 +32,7 @@ S3QueueMetadataFactory::getOrCreate(const std::string & zookeeper_path, const S3
return it->second.metadata;
}
void S3QueueMetadataFactory::remove(const std::string & zookeeper_path)
void ObjectStorageQueueMetadataFactory::remove(const std::string & zookeeper_path)
{
std::lock_guard lock(mutex);
auto it = metadata_by_path.find(zookeeper_path);
@ -57,9 +57,9 @@ void S3QueueMetadataFactory::remove(const std::string & zookeeper_path)
}
}
std::unordered_map<std::string, S3QueueMetadataFactory::FilesMetadataPtr> S3QueueMetadataFactory::getAll()
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> ObjectStorageQueueMetadataFactory::getAll()
{
std::unordered_map<std::string, S3QueueMetadataFactory::FilesMetadataPtr> result;
std::unordered_map<std::string, ObjectStorageQueueMetadataFactory::FilesMetadataPtr> result;
for (const auto & [zk_path, metadata_and_ref_count] : metadata_by_path)
result.emplace(zk_path, metadata_and_ref_count.metadata);
return result;

View File

@ -0,0 +1,37 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
namespace DB
{
class ObjectStorageQueueMetadataFactory final : private boost::noncopyable
{
public:
using FilesMetadataPtr = std::shared_ptr<ObjectStorageQueueMetadata>;
static ObjectStorageQueueMetadataFactory & instance();
FilesMetadataPtr getOrCreate(const std::string & zookeeper_path, const ObjectStorageQueueSettings & settings);
void remove(const std::string & zookeeper_path);
std::unordered_map<std::string, FilesMetadataPtr> getAll();
private:
struct Metadata
{
explicit Metadata(std::shared_ptr<ObjectStorageQueueMetadata> metadata_) : metadata(metadata_), ref_count(1) {}
std::shared_ptr<ObjectStorageQueueMetadata> metadata;
/// TODO: the ref count should be kept in keeper, because of the case with distributed processing.
size_t ref_count = 0;
};
using MetadataByPath = std::unordered_map<std::string, Metadata>;
MetadataByPath metadata_by_path;
std::mutex mutex;
};
}

View File

@ -1,4 +1,4 @@
#include <Storages/S3Queue/S3QueueOrderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h>
#include <Common/SipHash.h>
#include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
@ -16,7 +16,7 @@ namespace ErrorCodes
namespace
{
S3QueueOrderedFileMetadata::Bucket getBucketForPathImpl(const std::string & path, size_t buckets_num)
ObjectStorageQueueOrderedFileMetadata::Bucket getBucketForPathImpl(const std::string & path, size_t buckets_num)
{
return sipHash64(path) % buckets_num;
}
@ -40,7 +40,7 @@ namespace
}
}
S3QueueOrderedFileMetadata::BucketHolder::BucketHolder(
ObjectStorageQueueOrderedFileMetadata::BucketHolder::BucketHolder(
const Bucket & bucket_,
int bucket_version_,
const std::string & bucket_lock_path_,
@ -55,13 +55,13 @@ S3QueueOrderedFileMetadata::BucketHolder::BucketHolder(
{
}
void S3QueueOrderedFileMetadata::BucketHolder::release()
void ObjectStorageQueueOrderedFileMetadata::BucketHolder::release()
{
if (released)
return;
released = true;
LOG_TEST(getLogger("S3QueueBucketHolder"), "Releasing bucket {}", bucket_info->bucket);
LOG_TEST(getLogger("ObjectStorageQueueBucketHolder"), "Releasing bucket {}", bucket_info->bucket);
Coordination::Requests requests;
/// Check that bucket lock version has not changed
@ -75,7 +75,7 @@ void S3QueueOrderedFileMetadata::BucketHolder::release()
zkutil::KeeperMultiException::check(code, requests, responses);
}
S3QueueOrderedFileMetadata::BucketHolder::~BucketHolder()
ObjectStorageQueueOrderedFileMetadata::BucketHolder::~BucketHolder()
{
try
{
@ -87,7 +87,7 @@ S3QueueOrderedFileMetadata::BucketHolder::~BucketHolder()
}
}
S3QueueOrderedFileMetadata::S3QueueOrderedFileMetadata(
ObjectStorageQueueOrderedFileMetadata::ObjectStorageQueueOrderedFileMetadata(
const std::filesystem::path & zk_path_,
const std::string & path_,
FileStatusPtr file_status_,
@ -95,7 +95,7 @@ S3QueueOrderedFileMetadata::S3QueueOrderedFileMetadata(
size_t buckets_num_,
size_t max_loading_retries_,
LoggerPtr log_)
: S3QueueIFileMetadata(
: ObjectStorageQueueIFileMetadata(
path_,
/* processing_node_path */zk_path_ / "processing" / getNodeName(path_),
/* processed_node_path */getProcessedPath(zk_path_, path_, buckets_num_),
@ -109,7 +109,7 @@ S3QueueOrderedFileMetadata::S3QueueOrderedFileMetadata(
{
}
std::vector<std::string> S3QueueOrderedFileMetadata::getMetadataPaths(size_t buckets_num)
std::vector<std::string> ObjectStorageQueueOrderedFileMetadata::getMetadataPaths(size_t buckets_num)
{
if (buckets_num > 1)
{
@ -122,7 +122,7 @@ std::vector<std::string> S3QueueOrderedFileMetadata::getMetadataPaths(size_t buc
return {"failed", "processing"};
}
bool S3QueueOrderedFileMetadata::getMaxProcessedFile(
bool ObjectStorageQueueOrderedFileMetadata::getMaxProcessedFile(
NodeMetadata & result,
Coordination::Stat * stat,
const zkutil::ZooKeeperPtr & zk_client)
@ -130,7 +130,7 @@ bool S3QueueOrderedFileMetadata::getMaxProcessedFile(
return getMaxProcessedFile(result, stat, processed_node_path, zk_client);
}
bool S3QueueOrderedFileMetadata::getMaxProcessedFile(
bool ObjectStorageQueueOrderedFileMetadata::getMaxProcessedFile(
NodeMetadata & result,
Coordination::Stat * stat,
const std::string & processed_node_path_,
@ -146,12 +146,12 @@ bool S3QueueOrderedFileMetadata::getMaxProcessedFile(
return false;
}
S3QueueOrderedFileMetadata::Bucket S3QueueOrderedFileMetadata::getBucketForPath(const std::string & path_, size_t buckets_num)
ObjectStorageQueueOrderedFileMetadata::Bucket ObjectStorageQueueOrderedFileMetadata::getBucketForPath(const std::string & path_, size_t buckets_num)
{
return getBucketForPathImpl(path_, buckets_num);
}
S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcquireBucket(
ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr ObjectStorageQueueOrderedFileMetadata::tryAcquireBucket(
const std::filesystem::path & zk_path,
const Bucket & bucket,
const Processor & processor)
@ -172,7 +172,7 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
bucket_lock_id_path, processor_info, zkutil::CreateMode::Persistent, /* ignore_if_exists */true));
/// Update bucket lock id path. We use its version as a version of ephemeral bucket lock node.
/// (See comment near S3QueueIFileMetadata::processing_node_version).
/// (See comment near ObjectStorageQueueIFileMetadata::processing_node_version).
requests.push_back(zkutil::makeSetRequest(bucket_lock_id_path, processor_info, -1));
Coordination::Responses responses;
@ -183,7 +183,7 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
const auto bucket_lock_version = set_response->stat.version;
LOG_TEST(
getLogger("S3QueueOrderedFileMetadata"),
getLogger("ObjectStorageQueueOrderedFileMetadata"),
"Processor {} acquired bucket {} for processing (bucket lock version: {})",
processor, bucket, bucket_lock_version);
@ -204,7 +204,7 @@ S3QueueOrderedFileMetadata::BucketHolderPtr S3QueueOrderedFileMetadata::tryAcqui
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected error: {}", code);
}
std::pair<bool, S3QueueIFileMetadata::FileStatus::State> S3QueueOrderedFileMetadata::setProcessingImpl()
std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorageQueueOrderedFileMetadata::setProcessingImpl()
{
/// In one zookeeper transaction do the following:
enum RequestType
@ -300,7 +300,7 @@ std::pair<bool, S3QueueIFileMetadata::FileStatus::State> S3QueueOrderedFileMetad
}
}
void S3QueueOrderedFileMetadata::setProcessedAtStartRequests(
void ObjectStorageQueueOrderedFileMetadata::setProcessedAtStartRequests(
Coordination::Requests & requests,
const zkutil::ZooKeeperPtr & zk_client)
{
@ -318,7 +318,7 @@ void S3QueueOrderedFileMetadata::setProcessedAtStartRequests(
}
}
void S3QueueOrderedFileMetadata::setProcessedRequests(
void ObjectStorageQueueOrderedFileMetadata::setProcessedRequests(
Coordination::Requests & requests,
const zkutil::ZooKeeperPtr & zk_client,
const std::string & processed_node_path_,
@ -359,7 +359,7 @@ void S3QueueOrderedFileMetadata::setProcessedRequests(
}
}
void S3QueueOrderedFileMetadata::setProcessedImpl()
void ObjectStorageQueueOrderedFileMetadata::setProcessedImpl()
{
/// In one zookeeper transaction do the following:
enum RequestType

View File

@ -1,5 +1,5 @@
#pragma once
#include <Storages/S3Queue/S3QueueIFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h>
#include <Common/logger_useful.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <filesystem>
@ -7,7 +7,7 @@
namespace DB
{
class S3QueueOrderedFileMetadata : public S3QueueIFileMetadata
class ObjectStorageQueueOrderedFileMetadata : public ObjectStorageQueueIFileMetadata
{
public:
using Processor = std::string;
@ -21,7 +21,7 @@ public:
};
using BucketInfoPtr = std::shared_ptr<const BucketInfo>;
explicit S3QueueOrderedFileMetadata(
explicit ObjectStorageQueueOrderedFileMetadata(
const std::filesystem::path & zk_path_,
const std::string & path_,
FileStatusPtr file_status_,
@ -38,7 +38,7 @@ public:
const Bucket & bucket,
const Processor & processor);
static S3QueueOrderedFileMetadata::Bucket getBucketForPath(const std::string & path, size_t buckets_num);
static ObjectStorageQueueOrderedFileMetadata::Bucket getBucketForPath(const std::string & path, size_t buckets_num);
static std::vector<std::string> getMetadataPaths(size_t buckets_num);
@ -72,7 +72,7 @@ private:
bool ignore_if_exists);
};
struct S3QueueOrderedFileMetadata::BucketHolder
struct ObjectStorageQueueOrderedFileMetadata::BucketHolder
{
BucketHolder(
const Bucket & bucket_,

View File

@ -1,4 +1,4 @@
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Common/Exception.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
@ -13,14 +13,19 @@ namespace ErrorCodes
extern const int UNKNOWN_SETTING;
}
IMPLEMENT_SETTINGS_TRAITS(S3QueueSettingsTraits, LIST_OF_S3QUEUE_SETTINGS)
IMPLEMENT_SETTINGS_TRAITS(ObjectStorageQueueSettingsTraits, LIST_OF_OBJECT_STORAGE_QUEUE_SETTINGS)
void S3QueueSettings::loadFromQuery(ASTStorage & storage_def)
void ObjectStorageQueueSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
try
{
/// We support settings starting with s3_ for compatibility.
for (auto & change : storage_def.settings->changes)
if (change.name.starts_with("s3queue_"))
change.name = change.name.substr(std::strlen("s3queue_"));
applyChanges(storage_def.settings->changes);
}
catch (Exception & e)

View File

@ -0,0 +1,47 @@
#pragma once
#include <Core/BaseSettings.h>
#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
namespace DB
{
class ASTStorage;
#define OBJECT_STORAGE_QUEUE_RELATED_SETTINGS(M, ALIAS) \
M(ObjectStorageQueueMode, \
mode, \
ObjectStorageQueueMode::ORDERED, \
"With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer." \
"With ordered mode, only the max name of the successfully consumed file stored.", \
0) \
M(ObjectStorageQueueAction, after_processing, ObjectStorageQueueAction::KEEP, "Delete or keep file in after successful processing", 0) \
M(String, keeper_path, "", "Zookeeper node path", 0) \
M(UInt32, loading_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt32, processing_threads_num, 1, "Number of processing threads", 0) \
M(UInt32, enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(String, last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
M(UInt32, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
M(UInt32, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
M(UInt32, polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
M(UInt32, polling_backoff_ms, 1000, "Polling backoff", 0) \
M(UInt32, tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
M(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
M(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
M(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
#define LIST_OF_OBJECT_STORAGE_QUEUE_SETTINGS(M, ALIAS) \
OBJECT_STORAGE_QUEUE_RELATED_SETTINGS(M, ALIAS) \
LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(ObjectStorageQueueSettingsTraits, LIST_OF_OBJECT_STORAGE_QUEUE_SETTINGS)
struct ObjectStorageQueueSettings : public BaseSettings<ObjectStorageQueueSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -5,17 +5,11 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/logger_useful.h>
#include <Common/getRandomASCIIString.h>
#include <Storages/S3Queue/S3QueueSource.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
#include <Storages/VirtualColumnUtils.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
namespace CurrentMetrics
{
extern const Metric StorageS3Threads;
extern const Metric StorageS3ThreadsActive;
}
namespace ProfileEvents
{
extern const Event S3QueuePullMicroseconds;
@ -26,12 +20,11 @@ namespace DB
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int LOGICAL_ERROR;
}
StorageS3QueueSource::S3QueueObjectInfo::S3QueueObjectInfo(
ObjectStorageQueueSource::ObjectStorageQueueObjectInfo::ObjectStorageQueueObjectInfo(
const ObjectInfo & object_info,
Metadata::FileMetadataPtr processing_holder_)
: ObjectInfo(object_info.relative_path, object_info.metadata)
@ -39,12 +32,12 @@ StorageS3QueueSource::S3QueueObjectInfo::S3QueueObjectInfo(
{
}
StorageS3QueueSource::FileIterator::FileIterator(
std::shared_ptr<S3QueueMetadata> metadata_,
ObjectStorageQueueSource::FileIterator::FileIterator(
std::shared_ptr<ObjectStorageQueueMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_,
LoggerPtr logger_)
: StorageObjectStorageSource::IIterator("S3QueueIterator")
: StorageObjectStorageSource::IIterator("ObjectStorageQueueIterator")
, metadata(metadata_)
, glob_iterator(std::move(glob_iterator_))
, shutdown_called(shutdown_called_)
@ -52,15 +45,15 @@ StorageS3QueueSource::FileIterator::FileIterator(
{
}
size_t StorageS3QueueSource::FileIterator::estimatedKeysCount()
size_t ObjectStorageQueueSource::FileIterator::estimatedKeysCount()
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method estimateKeysCount is not implemented");
}
StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl(size_t processor)
ObjectStorageQueueSource::ObjectInfoPtr ObjectStorageQueueSource::FileIterator::nextImpl(size_t processor)
{
ObjectInfoPtr object_info;
S3QueueOrderedFileMetadata::BucketInfoPtr bucket_info;
ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr bucket_info;
while (!shutdown_called)
{
@ -80,13 +73,13 @@ StorageS3QueueSource::ObjectInfoPtr StorageS3QueueSource::FileIterator::nextImpl
auto file_metadata = metadata->getFileMetadata(object_info->relative_path, bucket_info);
if (file_metadata->setProcessing())
return std::make_shared<S3QueueObjectInfo>(*object_info, file_metadata);
return std::make_shared<ObjectStorageQueueObjectInfo>(*object_info, file_metadata);
}
return {};
}
std::pair<StorageS3QueueSource::ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr>
StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processor)
std::pair<ObjectStorageQueueSource::ObjectInfoPtr, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr>
ObjectStorageQueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processor)
{
/// We need this lock to maintain consistency between listing s3 directory
/// and getting/putting result into listed_keys_cache.
@ -287,19 +280,19 @@ StorageS3QueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processo
}
}
StorageS3QueueSource::StorageS3QueueSource(
ObjectStorageQueueSource::ObjectStorageQueueSource(
String name_,
size_t processor_id_,
const Block & header_,
std::unique_ptr<StorageObjectStorageSource> internal_source_,
std::shared_ptr<S3QueueMetadata> files_metadata_,
const S3QueueAction & action_,
std::shared_ptr<ObjectStorageQueueMetadata> files_metadata_,
const ObjectStorageQueueAction & action_,
RemoveFileFunc remove_file_func_,
const NamesAndTypesList & requested_virtual_columns_,
ContextPtr context_,
const std::atomic<bool> & shutdown_called_,
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_,
std::shared_ptr<ObjectStorageQueueLog> s3_queue_log_,
const StorageID & storage_id_,
LoggerPtr log_)
: ISource(header_)
@ -319,12 +312,12 @@ StorageS3QueueSource::StorageS3QueueSource(
{
}
String StorageS3QueueSource::getName() const
String ObjectStorageQueueSource::getName() const
{
return name;
}
void StorageS3QueueSource::lazyInitialize(size_t processor)
void ObjectStorageQueueSource::lazyInitialize(size_t processor)
{
if (initialized)
return;
@ -336,7 +329,7 @@ void StorageS3QueueSource::lazyInitialize(size_t processor)
initialized = true;
}
Chunk StorageS3QueueSource::generate()
Chunk ObjectStorageQueueSource::generate()
{
lazyInitialize(processor_id);
@ -345,7 +338,7 @@ Chunk StorageS3QueueSource::generate()
if (!reader)
break;
const auto * object_info = dynamic_cast<const S3QueueObjectInfo *>(&reader.getObjectInfo());
const auto * object_info = dynamic_cast<const ObjectStorageQueueObjectInfo *>(&reader.getObjectInfo());
auto file_metadata = object_info->processing_holder;
auto file_status = file_metadata->getFileStatus();
@ -473,33 +466,33 @@ Chunk StorageS3QueueSource::generate()
return {};
}
void StorageS3QueueSource::applyActionAfterProcessing(const String & path)
void ObjectStorageQueueSource::applyActionAfterProcessing(const String & path)
{
switch (action)
{
case S3QueueAction::DELETE:
case ObjectStorageQueueAction::DELETE:
{
assert(remove_file_func);
remove_file_func(path);
break;
}
case S3QueueAction::KEEP:
case ObjectStorageQueueAction::KEEP:
break;
}
}
void StorageS3QueueSource::appendLogElement(
void ObjectStorageQueueSource::appendLogElement(
const std::string & filename,
S3QueueMetadata::FileStatus & file_status_,
ObjectStorageQueueMetadata::FileStatus & file_status_,
size_t processed_rows,
bool processed)
{
if (!s3_queue_log)
return;
S3QueueLogElement elem{};
ObjectStorageQueueLogElement elem{};
{
elem = S3QueueLogElement
elem = ObjectStorageQueueLogElement
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.database = storage_id.database_name,
@ -507,7 +500,7 @@ void StorageS3QueueSource::appendLogElement(
.uuid = toString(storage_id.uuid),
.file_name = filename,
.rows_processed = processed_rows,
.status = processed ? S3QueueLogElement::S3QueueStatus::Processed : S3QueueLogElement::S3QueueStatus::Failed,
.status = processed ? ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Processed : ObjectStorageQueueLogElement::ObjectStorageQueueStatus::Failed,
.counters_snapshot = file_status_.profile_counters.getPartiallyAtomicSnapshot(),
.processing_start_time = file_status_.processing_start_time,
.processing_end_time = file_status_.processing_end_time,

View File

@ -3,7 +3,7 @@
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Processors/ISource.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Interpreters/S3QueueLog.h>
@ -16,7 +16,7 @@ namespace DB
struct ObjectMetadata;
class StorageS3QueueSource : public ISource, WithContext
class ObjectStorageQueueSource : public ISource, WithContext
{
public:
using Storage = StorageObjectStorage;
@ -24,16 +24,16 @@ public:
using GlobIterator = StorageObjectStorageSource::GlobIterator;
using ZooKeeperGetter = std::function<zkutil::ZooKeeperPtr()>;
using RemoveFileFunc = std::function<void(std::string)>;
using FileStatusPtr = S3QueueMetadata::FileStatusPtr;
using FileStatusPtr = ObjectStorageQueueMetadata::FileStatusPtr;
using ReaderHolder = StorageObjectStorageSource::ReaderHolder;
using Metadata = S3QueueMetadata;
using Metadata = ObjectStorageQueueMetadata;
using ObjectInfo = StorageObjectStorageSource::ObjectInfo;
using ObjectInfoPtr = std::shared_ptr<ObjectInfo>;
using ObjectInfos = std::vector<ObjectInfoPtr>;
struct S3QueueObjectInfo : public ObjectInfo
struct ObjectStorageQueueObjectInfo : public ObjectInfo
{
S3QueueObjectInfo(
ObjectStorageQueueObjectInfo(
const ObjectInfo & object_info,
Metadata::FileMetadataPtr processing_holder_);
@ -44,7 +44,7 @@ public:
{
public:
FileIterator(
std::shared_ptr<S3QueueMetadata> metadata_,
std::shared_ptr<ObjectStorageQueueMetadata> metadata_,
std::unique_ptr<GlobIterator> glob_iterator_,
std::atomic<bool> & shutdown_called_,
LoggerPtr logger_);
@ -57,10 +57,10 @@ public:
size_t estimatedKeysCount() override;
private:
using Bucket = S3QueueMetadata::Bucket;
using Processor = S3QueueMetadata::Processor;
using Bucket = ObjectStorageQueueMetadata::Bucket;
using Processor = ObjectStorageQueueMetadata::Processor;
const std::shared_ptr<S3QueueMetadata> metadata;
const std::shared_ptr<ObjectStorageQueueMetadata> metadata;
const std::unique_ptr<GlobIterator> glob_iterator;
std::atomic<bool> & shutdown_called;
@ -75,24 +75,24 @@ public:
};
std::unordered_map<Bucket, ListedKeys> listed_keys_cache;
bool iterator_finished = false;
std::unordered_map<size_t, S3QueueOrderedFileMetadata::BucketHolderPtr> bucket_holders;
std::unordered_map<size_t, ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr> bucket_holders;
std::pair<ObjectInfoPtr, S3QueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor);
std::pair<ObjectInfoPtr, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr> getNextKeyFromAcquiredBucket(size_t processor);
};
StorageS3QueueSource(
ObjectStorageQueueSource(
String name_,
size_t processor_id_,
const Block & header_,
std::unique_ptr<StorageObjectStorageSource> internal_source_,
std::shared_ptr<S3QueueMetadata> files_metadata_,
const S3QueueAction & action_,
std::shared_ptr<ObjectStorageQueueMetadata> files_metadata_,
const ObjectStorageQueueAction & action_,
RemoveFileFunc remove_file_func_,
const NamesAndTypesList & requested_virtual_columns_,
ContextPtr context_,
const std::atomic<bool> & shutdown_called_,
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<S3QueueLog> s3_queue_log_,
std::shared_ptr<ObjectStorageQueueLog> s3_queue_log_,
const StorageID & storage_id_,
LoggerPtr log_);
@ -105,13 +105,13 @@ public:
private:
const String name;
const size_t processor_id;
const S3QueueAction action;
const std::shared_ptr<S3QueueMetadata> files_metadata;
const ObjectStorageQueueAction action;
const std::shared_ptr<ObjectStorageQueueMetadata> files_metadata;
const std::shared_ptr<StorageObjectStorageSource> internal_source;
const NamesAndTypesList requested_virtual_columns;
const std::atomic<bool> & shutdown_called;
const std::atomic<bool> & table_is_being_dropped;
const std::shared_ptr<S3QueueLog> s3_queue_log;
const std::shared_ptr<ObjectStorageQueueLog> s3_queue_log;
const StorageID storage_id;
RemoveFileFunc remove_file_func;
@ -122,10 +122,10 @@ private:
std::atomic<bool> initialized{false};
size_t processed_rows_from_file = 0;
S3QueueOrderedFileMetadata::BucketHolderPtr current_bucket_holder;
ObjectStorageQueueOrderedFileMetadata::BucketHolderPtr current_bucket_holder;
void applyActionAfterProcessing(const String & path);
void appendLogElement(const std::string & filename, S3QueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
void appendLogElement(const std::string & filename, ObjectStorageQueueMetadata::FileStatus & file_status_, size_t processed_rows, bool processed);
void lazyInitialize(size_t processor);
};

View File

@ -3,9 +3,9 @@
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
@ -20,33 +20,33 @@ namespace ErrorCodes
namespace
{
S3QueueMode modeFromString(const std::string & mode)
ObjectStorageQueueMode modeFromString(const std::string & mode)
{
if (mode == "ordered")
return S3QueueMode::ORDERED;
return ObjectStorageQueueMode::ORDERED;
if (mode == "unordered")
return S3QueueMode::UNORDERED;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected S3Queue mode: {}", mode);
return ObjectStorageQueueMode::UNORDERED;
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected ObjectStorageQueue mode: {}", mode);
}
}
S3QueueTableMetadata::S3QueueTableMetadata(
ObjectStorageQueueTableMetadata::ObjectStorageQueueTableMetadata(
const StorageObjectStorage::Configuration & configuration,
const S3QueueSettings & engine_settings,
const ObjectStorageQueueSettings & engine_settings,
const StorageInMemoryMetadata & storage_metadata)
{
format_name = configuration.format;
after_processing = engine_settings.after_processing.toString();
mode = engine_settings.mode.toString();
tracked_files_limit = engine_settings.s3queue_tracked_files_limit;
tracked_file_ttl_sec = engine_settings.s3queue_tracked_file_ttl_sec;
buckets = engine_settings.s3queue_buckets;
processing_threads_num = engine_settings.s3queue_processing_threads_num;
tracked_files_limit = engine_settings.tracked_files_limit;
tracked_file_ttl_sec = engine_settings.tracked_file_ttl_sec;
buckets = engine_settings.buckets;
processing_threads_num = engine_settings.processing_threads_num;
columns = storage_metadata.getColumns().toString();
}
String S3QueueTableMetadata::toString() const
String ObjectStorageQueueTableMetadata::toString() const
{
Poco::JSON::Object json;
json.set("after_processing", after_processing);
@ -65,7 +65,7 @@ String S3QueueTableMetadata::toString() const
return oss.str();
}
void S3QueueTableMetadata::read(const String & metadata_str)
void ObjectStorageQueueTableMetadata::read(const String & metadata_str)
{
Poco::JSON::Parser parser;
auto json = parser.parse(metadata_str).extract<Poco::JSON::Object::Ptr>();
@ -102,19 +102,19 @@ void S3QueueTableMetadata::read(const String & metadata_str)
buckets = json->getValue<UInt64>("buckets");
}
S3QueueTableMetadata S3QueueTableMetadata::parse(const String & metadata_str)
ObjectStorageQueueTableMetadata ObjectStorageQueueTableMetadata::parse(const String & metadata_str)
{
S3QueueTableMetadata metadata;
ObjectStorageQueueTableMetadata metadata;
metadata.read(metadata_str);
return metadata;
}
void S3QueueTableMetadata::checkEquals(const S3QueueTableMetadata & from_zk) const
void ObjectStorageQueueTableMetadata::checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const
{
checkImmutableFieldsEquals(from_zk);
}
void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata & from_zk) const
void ObjectStorageQueueTableMetadata::checkImmutableFieldsEquals(const ObjectStorageQueueTableMetadata & from_zk) const
{
if (after_processing != from_zk.after_processing)
throw Exception(
@ -164,29 +164,29 @@ void S3QueueTableMetadata::checkImmutableFieldsEquals(const S3QueueTableMetadata
from_zk.last_processed_path,
last_processed_path);
if (modeFromString(mode) == S3QueueMode::ORDERED)
if (modeFromString(mode) == ObjectStorageQueueMode::ORDERED)
{
if (buckets != from_zk.buckets)
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in s3queue_buckets setting. "
"Existing table metadata in ZooKeeper differs in buckets setting. "
"Stored in ZooKeeper: {}, local: {}",
from_zk.buckets, buckets);
}
if (S3QueueMetadata::getBucketsNum(*this) != S3QueueMetadata::getBucketsNum(from_zk))
if (ObjectStorageQueueMetadata::getBucketsNum(*this) != ObjectStorageQueueMetadata::getBucketsNum(from_zk))
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in processing buckets. "
"Stored in ZooKeeper: {}, local: {}",
S3QueueMetadata::getBucketsNum(*this), S3QueueMetadata::getBucketsNum(from_zk));
ObjectStorageQueueMetadata::getBucketsNum(*this), ObjectStorageQueueMetadata::getBucketsNum(from_zk));
}
}
}
void S3QueueTableMetadata::checkEquals(const S3QueueSettings & current, const S3QueueSettings & expected)
void ObjectStorageQueueTableMetadata::checkEquals(const ObjectStorageQueueSettings & current, const ObjectStorageQueueSettings & expected)
{
if (current.after_processing != expected.after_processing)
throw Exception(
@ -204,48 +204,48 @@ void S3QueueTableMetadata::checkEquals(const S3QueueSettings & current, const S3
expected.mode.toString(),
current.mode.toString());
if (current.s3queue_tracked_files_limit != expected.s3queue_tracked_files_limit)
if (current.tracked_files_limit != expected.tracked_files_limit)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set size. "
"Stored in ZooKeeper: {}, local: {}",
expected.s3queue_tracked_files_limit,
current.s3queue_tracked_files_limit);
expected.tracked_files_limit,
current.tracked_files_limit);
if (current.s3queue_tracked_file_ttl_sec != expected.s3queue_tracked_file_ttl_sec)
if (current.tracked_file_ttl_sec != expected.tracked_file_ttl_sec)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in max set age. "
"Stored in ZooKeeper: {}, local: {}",
expected.s3queue_tracked_file_ttl_sec,
current.s3queue_tracked_file_ttl_sec);
expected.tracked_file_ttl_sec,
current.tracked_file_ttl_sec);
if (current.s3queue_last_processed_path.value != expected.s3queue_last_processed_path.value)
if (current.last_processed_path.value != expected.last_processed_path.value)
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in last_processed_path. "
"Stored in ZooKeeper: {}, local: {}",
expected.s3queue_last_processed_path.value,
current.s3queue_last_processed_path.value);
expected.last_processed_path.value,
current.last_processed_path.value);
if (current.mode == S3QueueMode::ORDERED)
if (current.mode == ObjectStorageQueueMode::ORDERED)
{
if (current.s3queue_buckets != expected.s3queue_buckets)
if (current.buckets != expected.buckets)
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in s3queue_buckets setting. "
"Existing table metadata in ZooKeeper differs in buckets setting. "
"Stored in ZooKeeper: {}, local: {}",
expected.s3queue_buckets, current.s3queue_buckets);
expected.buckets, current.buckets);
}
if (S3QueueMetadata::getBucketsNum(current) != S3QueueMetadata::getBucketsNum(expected))
if (ObjectStorageQueueMetadata::getBucketsNum(current) != ObjectStorageQueueMetadata::getBucketsNum(expected))
{
throw Exception(
ErrorCodes::METADATA_MISMATCH,
"Existing table metadata in ZooKeeper differs in processing buckets. "
"Stored in ZooKeeper: {}, local: {}",
S3QueueMetadata::getBucketsNum(current), S3QueueMetadata::getBucketsNum(expected));
ObjectStorageQueueMetadata::getBucketsNum(current), ObjectStorageQueueMetadata::getBucketsNum(expected));
}
}
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <base/types.h>
@ -11,10 +11,10 @@ namespace DB
class WriteBuffer;
class ReadBuffer;
/** The basic parameters of S3Queue table engine for saving in ZooKeeper.
/** The basic parameters of ObjectStorageQueue table engine for saving in ZooKeeper.
* Lets you verify that they match local ones.
*/
struct S3QueueTableMetadata
struct ObjectStorageQueueTableMetadata
{
String format_name;
String columns;
@ -26,22 +26,22 @@ struct S3QueueTableMetadata
UInt64 processing_threads_num = 1;
String last_processed_path;
S3QueueTableMetadata() = default;
S3QueueTableMetadata(
ObjectStorageQueueTableMetadata() = default;
ObjectStorageQueueTableMetadata(
const StorageObjectStorage::Configuration & configuration,
const S3QueueSettings & engine_settings,
const ObjectStorageQueueSettings & engine_settings,
const StorageInMemoryMetadata & storage_metadata);
void read(const String & metadata_str);
static S3QueueTableMetadata parse(const String & metadata_str);
static ObjectStorageQueueTableMetadata parse(const String & metadata_str);
String toString() const;
void checkEquals(const S3QueueTableMetadata & from_zk) const;
static void checkEquals(const S3QueueSettings & current, const S3QueueSettings & expected);
void checkEquals(const ObjectStorageQueueTableMetadata & from_zk) const;
static void checkEquals(const ObjectStorageQueueSettings & current, const ObjectStorageQueueSettings & expected);
private:
void checkImmutableFieldsEquals(const S3QueueTableMetadata & from_zk) const;
void checkImmutableFieldsEquals(const ObjectStorageQueueTableMetadata & from_zk) const;
};

View File

@ -1,4 +1,4 @@
#include <Storages/S3Queue/S3QueueUnorderedFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueUnorderedFileMetadata.h>
#include <Common/getRandomASCIIString.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Interpreters/Context.h>
@ -18,13 +18,13 @@ namespace
}
}
S3QueueUnorderedFileMetadata::S3QueueUnorderedFileMetadata(
ObjectStorageQueueUnorderedFileMetadata::ObjectStorageQueueUnorderedFileMetadata(
const std::filesystem::path & zk_path,
const std::string & path_,
FileStatusPtr file_status_,
size_t max_loading_retries_,
LoggerPtr log_)
: S3QueueIFileMetadata(
: ObjectStorageQueueIFileMetadata(
path_,
/* processing_node_path */zk_path / "processing" / getNodeName(path_),
/* processed_node_path */zk_path / "processed" / getNodeName(path_),
@ -35,7 +35,7 @@ S3QueueUnorderedFileMetadata::S3QueueUnorderedFileMetadata(
{
}
std::pair<bool, S3QueueIFileMetadata::FileStatus::State> S3QueueUnorderedFileMetadata::setProcessingImpl()
std::pair<bool, ObjectStorageQueueIFileMetadata::FileStatus::State> ObjectStorageQueueUnorderedFileMetadata::setProcessingImpl()
{
/// In one zookeeper transaction do the following:
enum RequestType
@ -89,7 +89,7 @@ std::pair<bool, S3QueueIFileMetadata::FileStatus::State> S3QueueUnorderedFileMet
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected state of zookeeper transaction: {}", magic_enum::enum_name(code));
}
void S3QueueUnorderedFileMetadata::setProcessedAtStartRequests(
void ObjectStorageQueueUnorderedFileMetadata::setProcessedAtStartRequests(
Coordination::Requests & requests,
const zkutil::ZooKeeperPtr &)
{
@ -98,7 +98,7 @@ void S3QueueUnorderedFileMetadata::setProcessedAtStartRequests(
processed_node_path, node_metadata.toString(), zkutil::CreateMode::Persistent));
}
void S3QueueUnorderedFileMetadata::setProcessedImpl()
void ObjectStorageQueueUnorderedFileMetadata::setProcessedImpl()
{
/// In one zookeeper transaction do the following:
enum RequestType

View File

@ -1,17 +1,17 @@
#pragma once
#include <Storages/S3Queue/S3QueueIFileMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h>
#include <filesystem>
#include <Common/logger_useful.h>
namespace DB
{
class S3QueueUnorderedFileMetadata : public S3QueueIFileMetadata
class ObjectStorageQueueUnorderedFileMetadata : public ObjectStorageQueueIFileMetadata
{
public:
using Bucket = size_t;
explicit S3QueueUnorderedFileMetadata(
explicit ObjectStorageQueueUnorderedFileMetadata(
const std::filesystem::path & zk_path,
const std::string & path_,
FileStatusPtr file_status_,

View File

@ -1,10 +1,7 @@
#include <optional>
#include "config.h"
#include <Common/ProfileEvents.h>
#include <IO/S3Common.h>
#include <IO/CompressionMethod.h>
#include <Formats/FormatFactory.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Parsers/ASTFunction.h>
@ -15,16 +12,15 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <Processors/Sources/NullSource.h>
#include <Storages/S3Queue/S3QueueTableMetadata.h>
#include <Storages/S3Queue/StorageS3Queue.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
#include <Storages/StorageFactory.h>
#include <Formats/FormatFactory.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h>
#include <Storages/ObjectStorageQueue/StorageObjectStorageQueue.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/StorageSnapshot.h>
#include <Storages/VirtualColumnUtils.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorage/Utils.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -32,12 +28,6 @@
namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event S3DeleteObjects;
extern const Event S3ListObjects;
}
namespace DB
{
@ -45,23 +35,22 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int S3_ERROR;
extern const int QUERY_NOT_ALLOWED;
}
namespace
{
std::string chooseZooKeeperPath(const StorageID & table_id, const Settings & settings, const S3QueueSettings & s3queue_settings)
std::string chooseZooKeeperPath(const StorageID & table_id, const Settings & settings, const ObjectStorageQueueSettings & queue_settings)
{
std::string zk_path_prefix = settings.s3queue_default_zookeeper_path.value;
if (zk_path_prefix.empty())
zk_path_prefix = "/";
std::string result_zk_path;
if (s3queue_settings.keeper_path.changed)
if (queue_settings.keeper_path.changed)
{
/// We do not add table uuid here on purpose.
result_zk_path = fs::path(zk_path_prefix) / s3queue_settings.keeper_path.value;
result_zk_path = fs::path(zk_path_prefix) / queue_settings.keeper_path.value;
}
else
{
@ -71,35 +60,35 @@ namespace
return zkutil::extractZooKeeperPath(result_zk_path, true);
}
void checkAndAdjustSettings(S3QueueSettings & s3queue_settings, const Settings & settings, bool is_attach)
void checkAndAdjustSettings(ObjectStorageQueueSettings & queue_settings, const Settings & settings, bool is_attach)
{
if (!is_attach && !s3queue_settings.mode.changed)
if (!is_attach && !queue_settings.mode.changed)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `mode` (Unordered/Ordered) is not specified, but is required.");
}
/// In case !is_attach, we leave Ordered mode as default for compatibility.
if (!s3queue_settings.s3queue_processing_threads_num)
if (!queue_settings.processing_threads_num)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `s3queue_processing_threads_num` cannot be set to zero");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `processing_threads_num` cannot be set to zero");
}
if (!s3queue_settings.s3queue_enable_logging_to_s3queue_log.changed)
if (!queue_settings.enable_logging_to_s3queue_log.changed)
{
s3queue_settings.s3queue_enable_logging_to_s3queue_log = settings.s3queue_enable_logging_to_s3queue_log;
queue_settings.enable_logging_to_s3queue_log = settings.s3queue_enable_logging_to_s3queue_log;
}
if (s3queue_settings.s3queue_cleanup_interval_min_ms > s3queue_settings.s3queue_cleanup_interval_max_ms)
if (queue_settings.cleanup_interval_min_ms > queue_settings.cleanup_interval_max_ms)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Setting `s3queue_cleanup_interval_min_ms` ({}) must be less or equal to `s3queue_cleanup_interval_max_ms` ({})",
s3queue_settings.s3queue_cleanup_interval_min_ms, s3queue_settings.s3queue_cleanup_interval_max_ms);
"Setting `cleanup_interval_min_ms` ({}) must be less or equal to `cleanup_interval_max_ms` ({})",
queue_settings.cleanup_interval_min_ms, queue_settings.cleanup_interval_max_ms);
}
}
}
StorageS3Queue::StorageS3Queue(
std::unique_ptr<S3QueueSettings> s3queue_settings_,
StorageObjectStorageQueue::StorageObjectStorageQueue(
std::unique_ptr<ObjectStorageQueueSettings> queue_settings_,
const ConfigurationPtr configuration_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
@ -111,12 +100,12 @@ StorageS3Queue::StorageS3Queue(
LoadingStrictnessLevel mode)
: IStorage(table_id_)
, WithContext(context_)
, s3queue_settings(std::move(s3queue_settings_))
, zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *s3queue_settings))
, queue_settings(std::move(queue_settings_))
, zk_path(chooseZooKeeperPath(table_id_, context_->getSettingsRef(), *queue_settings))
, configuration{configuration_}
, format_settings(format_settings_)
, reschedule_processing_interval_ms(s3queue_settings->s3queue_polling_min_timeout_ms)
, log(getLogger("StorageS3Queue (" + table_id_.getFullTableName() + ")"))
, reschedule_processing_interval_ms(queue_settings->polling_min_timeout_ms)
, log(getLogger("StorageObjectStorageQueue (" + table_id_.getFullTableName() + ")"))
{
if (configuration->getPath().empty())
{
@ -128,10 +117,10 @@ StorageS3Queue::StorageS3Queue(
}
else if (!configuration->isPathWithGlobs())
{
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "S3Queue url must either end with '/' or contain globs");
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "ObjectStorageQueue url must either end with '/' or contain globs");
}
checkAndAdjustSettings(*s3queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE);
checkAndAdjustSettings(*queue_settings, context_->getSettingsRef(), mode > LoadingStrictnessLevel::CREATE);
object_storage = configuration->createObjectStorage(context_, /* is_readonly */true);
FormatFactory::instance().checkFormatName(configuration->format);
@ -149,30 +138,30 @@ StorageS3Queue::StorageS3Queue(
setInMemoryMetadata(storage_metadata);
LOG_INFO(log, "Using zookeeper path: {}", zk_path.string());
task = getContext()->getSchedulePool().createTask("S3QueueStreamingTask", [this] { threadFunc(); });
task = getContext()->getSchedulePool().createTask("ObjectStorageQueueStreamingTask", [this] { threadFunc(); });
/// Get metadata manager from S3QueueMetadataFactory,
/// Get metadata manager from ObjectStorageQueueMetadataFactory,
/// it will increase the ref count for the metadata object.
/// The ref count is decreased when StorageS3Queue::drop() method is called.
files_metadata = S3QueueMetadataFactory::instance().getOrCreate(zk_path, *s3queue_settings);
/// The ref count is decreased when StorageObjectStorageQueue::drop() method is called.
files_metadata = ObjectStorageQueueMetadataFactory::instance().getOrCreate(zk_path, *queue_settings);
try
{
files_metadata->initialize(configuration_, storage_metadata);
}
catch (...)
{
S3QueueMetadataFactory::instance().remove(zk_path);
ObjectStorageQueueMetadataFactory::instance().remove(zk_path);
throw;
}
}
void StorageS3Queue::startup()
void StorageObjectStorageQueue::startup()
{
if (task)
task->activateAndSchedule();
}
void StorageS3Queue::shutdown(bool is_drop)
void StorageObjectStorageQueue::shutdown(bool is_drop)
{
table_is_being_dropped = is_drop;
shutdown_called = true;
@ -191,31 +180,31 @@ void StorageS3Queue::shutdown(bool is_drop)
LOG_TRACE(log, "Shut down storage");
}
void StorageS3Queue::drop()
void StorageObjectStorageQueue::drop()
{
S3QueueMetadataFactory::instance().remove(zk_path);
ObjectStorageQueueMetadataFactory::instance().remove(zk_path);
}
bool StorageS3Queue::supportsSubsetOfColumns(const ContextPtr & context_) const
bool StorageObjectStorageQueue::supportsSubsetOfColumns(const ContextPtr & context_) const
{
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context_, format_settings);
}
class ReadFromS3Queue : public SourceStepWithFilter
class ReadFromObjectStorageQueue : public SourceStepWithFilter
{
public:
std::string getName() const override { return "ReadFromS3Queue"; }
std::string getName() const override { return "ReadFromObjectStorageQueue"; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void applyFilters(ActionDAGNodes added_filter_nodes) override;
ReadFromS3Queue(
ReadFromObjectStorageQueue(
const Names & column_names_,
const SelectQueryInfo & query_info_,
const StorageSnapshotPtr & storage_snapshot_,
const ContextPtr & context_,
Block sample_block,
ReadFromFormatInfo info_,
std::shared_ptr<StorageS3Queue> storage_,
std::shared_ptr<StorageObjectStorageQueue> storage_,
size_t max_block_size_)
: SourceStepWithFilter(
DataStream{.header = std::move(sample_block)},
@ -231,15 +220,15 @@ public:
private:
ReadFromFormatInfo info;
std::shared_ptr<StorageS3Queue> storage;
std::shared_ptr<StorageObjectStorageQueue> storage;
size_t max_block_size;
std::shared_ptr<StorageS3Queue::FileIterator> iterator;
std::shared_ptr<StorageObjectStorageQueue::FileIterator> iterator;
void createIterator(const ActionsDAG::Node * predicate);
};
void ReadFromS3Queue::createIterator(const ActionsDAG::Node * predicate)
void ReadFromObjectStorageQueue::createIterator(const ActionsDAG::Node * predicate)
{
if (iterator)
return;
@ -248,7 +237,7 @@ void ReadFromS3Queue::createIterator(const ActionsDAG::Node * predicate)
}
void ReadFromS3Queue::applyFilters(ActionDAGNodes added_filter_nodes)
void ReadFromObjectStorageQueue::applyFilters(ActionDAGNodes added_filter_nodes)
{
SourceStepWithFilter::applyFilters(std::move(added_filter_nodes));
@ -259,7 +248,7 @@ void ReadFromS3Queue::applyFilters(ActionDAGNodes added_filter_nodes)
createIterator(predicate);
}
void StorageS3Queue::read(
void StorageObjectStorageQueue::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
@ -281,10 +270,10 @@ void StorageS3Queue::read(
"Cannot read from {} with attached materialized views", getName());
}
auto this_ptr = std::static_pointer_cast<StorageS3Queue>(shared_from_this());
auto this_ptr = std::static_pointer_cast<StorageObjectStorageQueue>(shared_from_this());
auto read_from_format_info = prepareReadingFromFormat(column_names, storage_snapshot, supportsSubsetOfColumns(local_context));
auto reading = std::make_unique<ReadFromS3Queue>(
auto reading = std::make_unique<ReadFromObjectStorageQueue>(
column_names,
query_info,
storage_snapshot,
@ -297,10 +286,10 @@ void StorageS3Queue::read(
query_plan.addStep(std::move(reading));
}
void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
void ReadFromObjectStorageQueue::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
Pipes pipes;
const size_t adjusted_num_streams = storage->s3queue_settings->s3queue_processing_threads_num;
const size_t adjusted_num_streams = storage->queue_settings->processing_threads_num;
createIterator(nullptr);
for (size_t i = 0; i < adjusted_num_streams; ++i)
@ -320,10 +309,10 @@ void ReadFromS3Queue::initializePipeline(QueryPipelineBuilder & pipeline, const
pipeline.init(std::move(pipe));
}
std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
std::shared_ptr<ObjectStorageQueueSource> StorageObjectStorageQueue::createSource(
size_t processor_id,
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
std::shared_ptr<StorageObjectStorageQueue::FileIterator> file_iterator,
size_t max_block_size,
ContextPtr local_context)
{
@ -343,14 +332,14 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
{
object_storage->removeObject(StoredObject(path));
};
auto s3_queue_log = s3queue_settings->s3queue_enable_logging_to_s3queue_log ? local_context->getS3QueueLog() : nullptr;
return std::make_shared<StorageS3QueueSource>(
auto s3_queue_log = queue_settings->enable_logging_to_s3queue_log ? local_context->getS3QueueLog() : nullptr;
return std::make_shared<ObjectStorageQueueSource>(
getName(),
processor_id,
info.source_header,
std::move(internal_source),
files_metadata,
s3queue_settings->after_processing,
queue_settings->after_processing,
file_deleter,
info.requested_virtual_columns,
local_context,
@ -361,7 +350,7 @@ std::shared_ptr<StorageS3QueueSource> StorageS3Queue::createSource(
log);
}
bool StorageS3Queue::hasDependencies(const StorageID & table_id)
bool StorageObjectStorageQueue::hasDependencies(const StorageID & table_id)
{
// Check if all dependencies are attached
auto view_ids = DatabaseCatalog::instance().getDependentViews(table_id);
@ -386,7 +375,7 @@ bool StorageS3Queue::hasDependencies(const StorageID & table_id)
return true;
}
void StorageS3Queue::threadFunc()
void StorageObjectStorageQueue::threadFunc()
{
if (shutdown_called)
return;
@ -404,12 +393,12 @@ void StorageS3Queue::threadFunc()
if (streamToViews())
{
/// Reset the reschedule interval.
reschedule_processing_interval_ms = s3queue_settings->s3queue_polling_min_timeout_ms;
reschedule_processing_interval_ms = queue_settings->polling_min_timeout_ms;
}
else
{
/// Increase the reschedule interval.
reschedule_processing_interval_ms += s3queue_settings->s3queue_polling_backoff_ms;
reschedule_processing_interval_ms += queue_settings->polling_backoff_ms;
}
LOG_DEBUG(log, "Stopped streaming to {} attached views", dependencies_count);
@ -426,12 +415,12 @@ void StorageS3Queue::threadFunc()
if (!shutdown_called)
{
LOG_TRACE(log, "Reschedule S3 Queue processing thread in {} ms", reschedule_processing_interval_ms);
LOG_TRACE(log, "Reschedule processing thread in {} ms", reschedule_processing_interval_ms);
task->scheduleAfter(reschedule_processing_interval_ms);
}
}
bool StorageS3Queue::streamToViews()
bool StorageObjectStorageQueue::streamToViews()
{
auto table_id = getStorageID();
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
@ -444,29 +433,29 @@ bool StorageS3Queue::streamToViews()
auto insert = std::make_shared<ASTInsertQuery>();
insert->table_id = table_id;
auto s3queue_context = Context::createCopy(getContext());
s3queue_context->makeQueryContext();
auto context = Context::createCopy(getContext());
context->makeQueryContext();
// Create a stream for each consumer and join them in a union stream
// Only insert into dependent views and expect that input blocks contain virtual columns
InterpreterInsertQuery interpreter(insert, s3queue_context, false, true, true);
InterpreterInsertQuery interpreter(insert, context, false, true, true);
auto block_io = interpreter.execute();
auto file_iterator = createFileIterator(s3queue_context, nullptr);
auto file_iterator = createFileIterator(context, nullptr);
auto read_from_format_info = prepareReadingFromFormat(block_io.pipeline.getHeader().getNames(), storage_snapshot, supportsSubsetOfColumns(s3queue_context));
auto read_from_format_info = prepareReadingFromFormat(block_io.pipeline.getHeader().getNames(), storage_snapshot, supportsSubsetOfColumns(context));
Pipes pipes;
pipes.reserve(s3queue_settings->s3queue_processing_threads_num);
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
pipes.reserve(queue_settings->processing_threads_num);
for (size_t i = 0; i < queue_settings->processing_threads_num; ++i)
{
auto source = createSource(i, read_from_format_info, file_iterator, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
auto source = createSource(i, read_from_format_info, file_iterator, DBMS_DEFAULT_BUFFER_SIZE, context);
pipes.emplace_back(std::move(source));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
block_io.pipeline.complete(std::move(pipe));
block_io.pipeline.setNumThreads(s3queue_settings->s3queue_processing_threads_num);
block_io.pipeline.setConcurrencyControl(s3queue_context->getSettingsRef().use_concurrency_control);
block_io.pipeline.setNumThreads(queue_settings->processing_threads_num);
block_io.pipeline.setConcurrencyControl(context->getSettingsRef().use_concurrency_control);
std::atomic_size_t rows = 0;
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });
@ -477,12 +466,12 @@ bool StorageS3Queue::streamToViews()
return rows > 0;
}
zkutil::ZooKeeperPtr StorageS3Queue::getZooKeeper() const
zkutil::ZooKeeperPtr StorageObjectStorageQueue::getZooKeeper() const
{
return getContext()->getZooKeeper();
}
std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate)
std::shared_ptr<StorageObjectStorageQueue::FileIterator> StorageObjectStorageQueue::createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate)
{
auto settings = configuration->getQuerySettings(local_context);
auto glob_iterator = std::make_unique<StorageObjectStorageSource::GlobIterator>(
@ -491,73 +480,4 @@ std::shared_ptr<StorageS3Queue::FileIterator> StorageS3Queue::createFileIterator
return std::make_shared<FileIterator>(files_metadata, std::move(glob_iterator), shutdown_called, log);
}
#if USE_AWS_S3
void registerStorageS3Queue(StorageFactory & factory)
{
factory.registerStorage(
"S3Queue",
[](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getContext(), false);
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
std::optional<FormatSettings> format_settings;
auto s3queue_settings = std::make_unique<S3QueueSettings>();
if (args.storage_def->settings)
{
s3queue_settings->loadFromQuery(*args.storage_def);
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
user_format_settings.set(change.name, change.value);
else
LOG_TRACE(getLogger("StorageS3"), "Remove: {}", change.name);
args.storage_def->settings->changes.removeSetting(change.name);
}
for (const auto & change : args.storage_def->settings->changes)
{
if (user_format_settings.has(change.name))
user_format_settings.applyChange(change);
}
format_settings = getFormatSettings(args.getContext(), user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
return std::make_shared<StorageS3Queue>(
std::move(s3queue_settings),
std::move(configuration),
args.table_id,
args.columns,
args.constraints,
args.comment,
args.getContext(),
format_settings,
args.storage_def,
args.mode);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
#endif
}

View File

@ -5,25 +5,24 @@
#include <Common/logger_useful.h>
#include <Core/BackgroundSchedulePool.h>
#include <Storages/IStorage.h>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueSource.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Interpreters/Context.h>
#include <IO/S3/BlobStorageLogWriter.h>
#include <Storages/StorageFactory.h>
namespace DB
{
class S3QueueMetadata;
class ObjectStorageQueueMetadata;
class StorageS3Queue : public IStorage, WithContext
class StorageObjectStorageQueue : public IStorage, WithContext
{
public:
using ConfigurationPtr = StorageObjectStorage::ConfigurationPtr;
StorageS3Queue(
std::unique_ptr<S3QueueSettings> s3queue_settings_,
StorageObjectStorageQueue(
std::unique_ptr<ObjectStorageQueueSettings> queue_settings_,
ConfigurationPtr configuration_,
const StorageID & table_id_,
const ColumnsDescription & columns_,
@ -34,7 +33,7 @@ public:
ASTStorage * engine_args,
LoadingStrictnessLevel mode);
String getName() const override { return "S3Queue"; }
String getName() const override { return "ObjectStorageQueue"; }
void read(
QueryPlan & query_plan,
@ -53,13 +52,13 @@ public:
zkutil::ZooKeeperPtr getZooKeeper() const;
private:
friend class ReadFromS3Queue;
using FileIterator = StorageS3QueueSource::FileIterator;
friend class ReadFromObjectStorageQueue;
using FileIterator = ObjectStorageQueueSource::FileIterator;
const std::unique_ptr<S3QueueSettings> s3queue_settings;
const std::unique_ptr<ObjectStorageQueueSettings> queue_settings;
const fs::path zk_path;
std::shared_ptr<S3QueueMetadata> files_metadata;
std::shared_ptr<ObjectStorageQueueMetadata> files_metadata;
ConfigurationPtr configuration;
ObjectStoragePtr object_storage;
@ -83,10 +82,10 @@ private:
bool supportsDynamicSubcolumns() const override { return true; }
std::shared_ptr<FileIterator> createFileIterator(ContextPtr local_context, const ActionsDAG::Node * predicate);
std::shared_ptr<StorageS3QueueSource> createSource(
std::shared_ptr<ObjectStorageQueueSource> createSource(
size_t processor_id,
const ReadFromFormatInfo & info,
std::shared_ptr<StorageS3Queue::FileIterator> file_iterator,
std::shared_ptr<StorageObjectStorageQueue::FileIterator> file_iterator,
size_t max_block_size,
ContextPtr local_context);

View File

@ -0,0 +1,87 @@
#include "config.h"
#if USE_AWS_S3
#include <IO/S3Common.h>
#include <Storages/StorageFactory.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/StorageObjectStorageQueue.h>
#include <Formats/FormatFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
void registerStorageS3Queue(StorageFactory & factory)
{
factory.registerStorage(
"S3Queue",
[](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getContext(), false);
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
std::optional<FormatSettings> format_settings;
auto queue_settings = std::make_unique<ObjectStorageQueueSettings>();
if (args.storage_def->settings)
{
queue_settings->loadFromQuery(*args.storage_def);
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
user_format_settings.set(change.name, change.value);
args.storage_def->settings->changes.removeSetting(change.name);
}
for (const auto & change : args.storage_def->settings->changes)
{
if (user_format_settings.has(change.name))
user_format_settings.applyChange(change);
}
format_settings = getFormatSettings(args.getContext(), user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
return std::make_shared<StorageObjectStorageQueue>(
std::move(queue_settings),
std::move(configuration),
args.table_id,
args.columns,
args.constraints,
args.comment,
args.getContext(),
format_settings,
args.storage_def,
args.mode);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::S3,
});
}
}
#endif

View File

@ -1,37 +0,0 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Storages/S3Queue/S3QueueSettings.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
namespace DB
{
class S3QueueMetadataFactory final : private boost::noncopyable
{
public:
using FilesMetadataPtr = std::shared_ptr<S3QueueMetadata>;
static S3QueueMetadataFactory & instance();
FilesMetadataPtr getOrCreate(const std::string & zookeeper_path, const S3QueueSettings & settings);
void remove(const std::string & zookeeper_path);
std::unordered_map<std::string, FilesMetadataPtr> getAll();
private:
struct Metadata
{
explicit Metadata(std::shared_ptr<S3QueueMetadata> metadata_) : metadata(metadata_), ref_count(1) {}
std::shared_ptr<S3QueueMetadata> metadata;
/// TODO: the ref count should be kept in keeper, because of the case with distributed processing.
size_t ref_count = 0;
};
using MetadataByPath = std::unordered_map<std::string, Metadata>;
MetadataByPath metadata_by_path;
std::mutex mutex;
};
}

View File

@ -1,47 +0,0 @@
#pragma once
#include <Core/BaseSettings.h>
#include <Core/Settings.h>
#include <Core/SettingsEnums.h>
namespace DB
{
class ASTStorage;
#define S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
M(S3QueueMode, \
mode, \
S3QueueMode::ORDERED, \
"With unordered mode, the set of all already processed files is tracked with persistent nodes in ZooKepeer." \
"With ordered mode, only the max name of the successfully consumed file stored.", \
0) \
M(S3QueueAction, after_processing, S3QueueAction::KEEP, "Delete or keep file in S3 after successful processing", 0) \
M(String, keeper_path, "", "Zookeeper node path", 0) \
M(UInt32, s3queue_loading_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt32, s3queue_processing_threads_num, 1, "Number of processing threads", 0) \
M(UInt32, s3queue_enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(String, s3queue_last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
M(UInt32, s3queue_tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
M(UInt32, s3queue_polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \
M(UInt32, s3queue_polling_max_timeout_ms, 10000, "Maximum timeout before next polling", 0) \
M(UInt32, s3queue_polling_backoff_ms, 1000, "Polling backoff", 0) \
M(UInt32, s3queue_tracked_files_limit, 1000, "For unordered mode. Max set size for tracking processed files in ZooKeeper", 0) \
M(UInt32, s3queue_cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
M(UInt32, s3queue_cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
M(UInt32, s3queue_buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
#define LIST_OF_S3QUEUE_SETTINGS(M, ALIAS) \
S3QUEUE_RELATED_SETTINGS(M, ALIAS) \
LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS)
DECLARE_SETTINGS_TRAITS(S3QueueSettingsTraits, LIST_OF_S3QUEUE_SETTINGS)
struct S3QueueSettings : public BaseSettings<S3QueueSettingsTraits>
{
void loadFromQuery(ASTStorage & storage_def);
};
}

View File

@ -11,9 +11,9 @@
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Storages/S3Queue/S3QueueMetadata.h>
#include <Storages/S3Queue/S3QueueMetadataFactory.h>
#include <Storages/S3Queue/StorageS3Queue.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h>
#include <Storages/ObjectStorageQueue/StorageObjectStorageQueue.h>
#include <Disks/IDisk.h>
@ -43,7 +43,7 @@ StorageSystemS3Queue::StorageSystemS3Queue(const StorageID & table_id_)
void StorageSystemS3Queue::fillData(MutableColumns & res_columns, ContextPtr, const ActionsDAG::Node *, std::vector<UInt8>) const
{
for (const auto & [zookeeper_path, metadata] : S3QueueMetadataFactory::instance().getAll())
for (const auto & [zookeeper_path, metadata] : ObjectStorageQueueMetadataFactory::instance().getAll())
{
for (const auto & [file_name, file_status] : metadata->getFileStatuses())
{

View File

@ -778,10 +778,10 @@ def test_max_set_age(started_cluster):
files_path,
additional_settings={
"keeper_path": keeper_path,
"s3queue_tracked_file_ttl_sec": max_age,
"s3queue_cleanup_interval_min_ms": 0,
"s3queue_cleanup_interval_max_ms": 0,
"s3queue_loading_retries": 0,
"tracked_file_ttl_sec": max_age,
"cleanup_interval_min_ms": 0,
"cleanup_interval_max_ms": 0,
"loading_retries": 0,
},
)
create_mv(node, table_name, dst_table_name)