Add AzureQueue

This commit is contained in:
kssenii 2024-06-19 16:16:37 +02:00
parent 78ccd03dd6
commit c47c8d603e
17 changed files with 200 additions and 110 deletions

View File

@ -635,11 +635,11 @@ The server successfully detected this situation and will download merged part fr
M(S3QueueSetFileProcessingMicroseconds, "Time spent to set file as processing")\
M(S3QueueSetFileProcessedMicroseconds, "Time spent to set file as processed")\
M(S3QueueSetFileFailedMicroseconds, "Time spent to set file as failed")\
M(S3QueueFailedFiles, "Number of files which failed to be processed")\
M(S3QueueProcessedFiles, "Number of files which were processed")\
M(S3QueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed")\
M(S3QueuePullMicroseconds, "Time spent to read file data")\
M(S3QueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses")\
M(ObjectStorageQueueFailedFiles, "Number of files which failed to be processed")\
M(ObjectStorageQueueProcessedFiles, "Number of files which were processed")\
M(ObjectStorageQueueCleanupMaxSetSizeOrTTLMicroseconds, "Time spent to set file as failed")\
M(ObjectStorageQueuePullMicroseconds, "Time spent to read file data")\
M(ObjectStorageQueueLockLocalFileStatusesMicroseconds, "Time spent to lock local file statuses")\
\
M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds")\
M(IOUringSQEsSubmitted, "Total number of io_uring SQEs submitted") \

View File

@ -10,7 +10,7 @@
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/ObjectStorageQueueLog.h>
#include <Interpreters/FilesystemReadPrefetchesLog.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/ZooKeeperLog.h>

View File

@ -4127,6 +4127,15 @@ std::shared_ptr<ObjectStorageQueueLog> Context::getS3QueueLog() const
return shared->system_logs->s3_queue_log;
}
std::shared_ptr<ObjectStorageQueueLog> Context::getAzureQueueLog() const
{
SharedLockGuard lock(shared->mutex);
if (!shared->system_logs)
return {};
return shared->system_logs->azure_queue_log;
}
std::shared_ptr<FilesystemReadPrefetchesLog> Context::getFilesystemReadPrefetchesLog() const
{
SharedLockGuard lock(shared->mutex);

View File

@ -1107,6 +1107,7 @@ public:
std::shared_ptr<ProcessorsProfileLog> getProcessorsProfileLog() const;
std::shared_ptr<FilesystemCacheLog> getFilesystemCacheLog() const;
std::shared_ptr<ObjectStorageQueueLog> getS3QueueLog() const;
std::shared_ptr<ObjectStorageQueueLog> getAzureQueueLog() const;
std::shared_ptr<FilesystemReadPrefetchesLog> getFilesystemReadPrefetchesLog() const;
std::shared_ptr<AsynchronousInsertLog> getAsynchronousInsertLog() const;
std::shared_ptr<BackupLog> getBackupLog() const;

View File

@ -8,7 +8,7 @@
#include <DataTypes/DataTypeMap.h>
#include <Interpreters/ProfileEventsExt.h>
#include <DataTypes/DataTypeEnum.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/ObjectStorageQueueLog.h>
namespace DB

View File

@ -24,7 +24,7 @@
#include <Interpreters/QueryLog.h>
#include <Interpreters/QueryThreadLog.h>
#include <Interpreters/QueryViewsLog.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/ObjectStorageQueueLog.h>
#include <Interpreters/SessionLog.h>
#include <Interpreters/TextLog.h>
#include <Interpreters/TraceLog.h>
@ -304,6 +304,7 @@ SystemLogs::SystemLogs(ContextPtr global_context, const Poco::Util::AbstractConf
asynchronous_insert_log = createSystemLog<AsynchronousInsertLog>(global_context, "system", "asynchronous_insert_log", config, "asynchronous_insert_log", "Contains a history for all asynchronous inserts executed on current server.");
backup_log = createSystemLog<BackupLog>(global_context, "system", "backup_log", config, "backup_log", "Contains logging entries with the information about BACKUP and RESTORE operations.");
s3_queue_log = createSystemLog<ObjectStorageQueueLog>(global_context, "system", "s3queue_log", config, "s3queue_log", "Contains logging entries with the information files processes by S3Queue engine.");
azure_queue_log = createSystemLog<ObjectStorageQueueLog>(global_context, "system", "azure_queue_log", config, "azure_queue_log", "Contains logging entries with the information files processes by S3Queue engine.");
blob_storage_log = createSystemLog<BlobStorageLog>(global_context, "system", "blob_storage_log", config, "blob_storage_log", "Contains logging entries with information about various blob storage operations such as uploads and deletes.");
if (query_log)

View File

@ -75,6 +75,7 @@ struct SystemLogs
std::shared_ptr<FilesystemCacheLog> filesystem_cache_log;
std::shared_ptr<FilesystemReadPrefetchesLog> filesystem_read_prefetches_log;
std::shared_ptr<ObjectStorageQueueLog> s3_queue_log;
std::shared_ptr<ObjectStorageQueueLog> azure_queue_log;
/// Metrics from system.asynchronous_metrics.
std::shared_ptr<AsynchronousMetricLog> asynchronous_metric_log;
/// OpenTelemetry trace spans.

View File

@ -11,8 +11,8 @@
namespace ProfileEvents
{
extern const Event S3QueueProcessedFiles;
extern const Event S3QueueFailedFiles;
extern const Event ObjectStorageQueueProcessedFiles;
extern const Event ObjectStorageQueueFailedFiles;
};
namespace DB
@ -169,7 +169,7 @@ ObjectStorageQueueIFileMetadata::NodeMetadata ObjectStorageQueueIFileMetadata::c
/// Since node name is just a hash we want to know to which file it corresponds,
/// so we keep "file_path" in nodes data.
/// "last_processed_timestamp" is needed for TTL metadata nodes enabled by tracked_file_ttl_sec.
/// "last_exception" is kept for introspection, should also be visible in system.s3queue_log if it is enabled.
/// "last_exception" is kept for introspection, should also be visible in system.s3(azure)queue_log if it is enabled.
/// "retries" is kept for retrying the processing enabled by loading_retries.
NodeMetadata metadata;
metadata.file_path = path;
@ -225,7 +225,7 @@ void ObjectStorageQueueIFileMetadata::setProcessed()
{
LOG_TRACE(log, "Setting file {} as processed (path: {})", path, processed_node_path);
ProfileEvents::increment(ProfileEvents::S3QueueProcessedFiles);
ProfileEvents::increment(ProfileEvents::ObjectStorageQueueProcessedFiles);
file_status->onProcessed();
setProcessedImpl();
@ -239,7 +239,7 @@ void ObjectStorageQueueIFileMetadata::setFailed(const std::string & exception)
{
LOG_TRACE(log, "Setting file {} as failed (exception: {}, path: {})", path, exception, failed_node_path);
ProfileEvents::increment(ProfileEvents::S3QueueFailedFiles);
ProfileEvents::increment(ProfileEvents::ObjectStorageQueueFailedFiles);
file_status->onFailed(exception);
node_metadata.last_exception = exception;

View File

@ -21,13 +21,8 @@
namespace ProfileEvents
{
extern const Event S3QueueSetFileProcessingMicroseconds;
extern const Event S3QueueSetFileProcessedMicroseconds;
extern const Event S3QueueSetFileFailedMicroseconds;
extern const Event S3QueueFailedFiles;
extern const Event S3QueueProcessedFiles;
extern const Event S3QueueCleanupMaxSetSizeOrTTLMicroseconds;
extern const Event S3QueueLockLocalFileStatusesMicroseconds;
extern const Event ObjectStorageQueueCleanupMaxSetSizeOrTTLMicroseconds;
extern const Event ObjectStorageQueueLockLocalFileStatusesMicroseconds;
};
namespace DB
@ -108,7 +103,7 @@ private:
std::unique_lock<std::mutex> lock() const
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueLockLocalFileStatusesMicroseconds);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::ObjectStorageQueueLockLocalFileStatusesMicroseconds);
return std::unique_lock(mutex);
}
};
@ -316,7 +311,7 @@ void ObjectStorageQueueMetadata::cleanupThreadFunc()
void ObjectStorageQueueMetadata::cleanupThreadFuncImpl()
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueueCleanupMaxSetSizeOrTTLMicroseconds);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::ObjectStorageQueueCleanupMaxSetSizeOrTTLMicroseconds);
const auto zk_client = getZooKeeper();
const fs::path zookeeper_processed_path = zookeeper_path / "processed";
const fs::path zookeeper_failed_path = zookeeper_path / "failed";

View File

@ -22,6 +22,7 @@ class ASTStorage;
M(UInt32, loading_retries, 0, "Retry loading up to specified number of times", 0) \
M(UInt32, processing_threads_num, 1, "Number of processing threads", 0) \
M(UInt32, enable_logging_to_s3queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(UInt32, enable_logging_to_azure_queue_log, 1, "Enable logging to system table system.s3queue_log", 0) \
M(String, last_processed_path, "", "For Ordered mode. Files that have lexicographically smaller file name are considered already processed", 0) \
M(UInt32, tracked_file_ttl_sec, 0, "Maximum number of seconds to store processed files in ZooKeeper node (store forever by default)", 0) \
M(UInt32, polling_min_timeout_ms, 1000, "Minimal timeout before next polling", 0) \

View File

@ -12,7 +12,7 @@
namespace ProfileEvents
{
extern const Event S3QueuePullMicroseconds;
extern const Event ObjectStorageQueuePullMicroseconds;
}
namespace DB
@ -81,7 +81,7 @@ ObjectStorageQueueSource::ObjectInfoPtr ObjectStorageQueueSource::FileIterator::
std::pair<ObjectStorageQueueSource::ObjectInfoPtr, ObjectStorageQueueOrderedFileMetadata::BucketInfoPtr>
ObjectStorageQueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t processor)
{
/// We need this lock to maintain consistency between listing s3 directory
/// We need this lock to maintain consistency between listing object storage directory
/// and getting/putting result into listed_keys_cache.
std::lock_guard lock(buckets_mutex);
@ -98,7 +98,7 @@ ObjectStorageQueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t proc
/// and checks if corresponding bucket is already acquired by someone.
/// In case it is already acquired, they put the key into listed_keys_cache,
/// so that the thread who acquired the bucket will be able to see
/// those keys without the need to list s3 directory once again.
/// those keys without the need to list object storage directory once again.
if (bucket_holder_it->second)
{
const auto bucket = bucket_holder_it->second->getBucket();
@ -155,7 +155,7 @@ ObjectStorageQueueSource::FileIterator::getNextKeyFromAcquiredBucket(size_t proc
}
}
/// If processing thread has already acquired some bucket
/// and while listing s3 directory gets a key which is in a different bucket,
/// and while listing object storage directory gets a key which is in a different bucket,
/// it puts the key into listed_keys_cache to allow others to process it,
/// because one processing thread can acquire only one bucket at a time.
/// Once a thread is finished with its acquired bucket, it checks listed_keys_cache
@ -292,7 +292,7 @@ ObjectStorageQueueSource::ObjectStorageQueueSource(
ContextPtr context_,
const std::atomic<bool> & shutdown_called_,
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<ObjectStorageQueueLog> s3_queue_log_,
std::shared_ptr<ObjectStorageQueueLog> system_queue_log_,
const StorageID & storage_id_,
LoggerPtr log_)
: ISource(header_)
@ -305,7 +305,7 @@ ObjectStorageQueueSource::ObjectStorageQueueSource(
, requested_virtual_columns(requested_virtual_columns_)
, shutdown_called(shutdown_called_)
, table_is_being_dropped(table_is_being_dropped_)
, s3_queue_log(s3_queue_log_)
, system_queue_log(system_queue_log_)
, storage_id(storage_id_)
, remove_file_func(remove_file_func_)
, log(log_)
@ -400,11 +400,11 @@ Chunk ObjectStorageQueueSource::generate()
auto * prev_scope = CurrentThread::get().attachProfileCountersScope(&file_status->profile_counters);
SCOPE_EXIT({ CurrentThread::get().attachProfileCountersScope(prev_scope); });
/// FIXME: if files are compressed, profile counters update does not work fully (s3 related counters are not saved). Why?
/// FIXME: if files are compressed, profile counters update does not work fully (object storage related counters are not saved). Why?
try
{
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::S3QueuePullMicroseconds);
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::ObjectStorageQueuePullMicroseconds);
Chunk chunk;
if (reader->pull(chunk))
@ -487,7 +487,7 @@ void ObjectStorageQueueSource::appendLogElement(
size_t processed_rows,
bool processed)
{
if (!s3_queue_log)
if (!system_queue_log)
return;
ObjectStorageQueueLogElement elem{};
@ -507,7 +507,7 @@ void ObjectStorageQueueSource::appendLogElement(
.exception = file_status_.getException(),
};
}
s3_queue_log->add(std::move(elem));
system_queue_log->add(std::move(elem));
}
}

View File

@ -6,7 +6,7 @@
#include <Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h>
#include <Storages/ObjectStorage/StorageObjectStorage.h>
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
#include <Interpreters/S3QueueLog.h>
#include <Interpreters/ObjectStorageQueueLog.h>
namespace Poco { class Logger; }
@ -49,9 +49,6 @@ public:
std::atomic<bool> & shutdown_called_,
LoggerPtr logger_);
/// Note:
/// List results in s3 are always returned in UTF-8 binary order.
/// (https://docs.aws.amazon.com/AmazonS3/latest/userguide/ListingKeysUsingAPIs.html)
ObjectInfoPtr nextImpl(size_t processor) override;
size_t estimatedKeysCount() override;
@ -92,7 +89,7 @@ public:
ContextPtr context_,
const std::atomic<bool> & shutdown_called_,
const std::atomic<bool> & table_is_being_dropped_,
std::shared_ptr<ObjectStorageQueueLog> s3_queue_log_,
std::shared_ptr<ObjectStorageQueueLog> system_queue_log_,
const StorageID & storage_id_,
LoggerPtr log_);
@ -111,7 +108,7 @@ private:
const NamesAndTypesList requested_virtual_columns;
const std::atomic<bool> & shutdown_called;
const std::atomic<bool> & table_is_being_dropped;
const std::shared_ptr<ObjectStorageQueueLog> s3_queue_log;
const std::shared_ptr<ObjectStorageQueueLog> system_queue_log;
const StorageID storage_id;
RemoveFileFunc remove_file_func;

View File

@ -105,7 +105,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
, configuration{configuration_}
, format_settings(format_settings_)
, reschedule_processing_interval_ms(queue_settings->polling_min_timeout_ms)
, log(getLogger("StorageObjectStorageQueue (" + table_id_.getFullTableName() + ")"))
, log(getLogger(fmt::format("{}Queue ({})", configuration->getEngineName(), table_id_.getFullTableName())))
{
if (configuration->getPath().empty())
{
@ -332,7 +332,10 @@ std::shared_ptr<ObjectStorageQueueSource> StorageObjectStorageQueue::createSourc
{
object_storage->removeObject(StoredObject(path));
};
auto s3_queue_log = queue_settings->enable_logging_to_s3queue_log ? local_context->getS3QueueLog() : nullptr;
auto system_queue_log = queue_settings->enable_logging_to_s3queue_log
? local_context->getS3QueueLog()
: queue_settings->enable_logging_to_azure_queue_log ? local_context->getAzureQueueLog() : nullptr;
return std::make_shared<ObjectStorageQueueSource>(
getName(),
processor_id,
@ -345,7 +348,7 @@ std::shared_ptr<ObjectStorageQueueSource> StorageObjectStorageQueue::createSourc
local_context,
shutdown_called,
table_is_being_dropped,
s3_queue_log,
system_queue_log,
getStorageID(),
log);
}

View File

@ -1,14 +1,19 @@
#include "config.h"
#if USE_AWS_S3
#include <IO/S3Common.h>
#include <Storages/StorageFactory.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSettings.h>
#include <Storages/ObjectStorageQueue/StorageObjectStorageQueue.h>
#include <Formats/FormatFactory.h>
#if USE_AWS_S3
#include <IO/S3Common.h>
#include <Storages/ObjectStorage/S3/Configuration.h>
#endif
#if USE_AZURE_BLOB_STORAGE
#include <Storages/ObjectStorage/Azure/Configuration.h>
#endif
namespace DB
{
@ -17,64 +22,71 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
template <typename Configuration>
StoragePtr createQueueStorage(const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = std::make_shared<Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getContext(), false);
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
std::optional<FormatSettings> format_settings;
auto queue_settings = std::make_unique<ObjectStorageQueueSettings>();
if (args.storage_def->settings)
{
queue_settings->loadFromQuery(*args.storage_def);
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
user_format_settings.set(change.name, change.value);
args.storage_def->settings->changes.removeSetting(change.name);
}
for (const auto & change : args.storage_def->settings->changes)
{
if (user_format_settings.has(change.name))
user_format_settings.applyChange(change);
}
format_settings = getFormatSettings(args.getContext(), user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
return std::make_shared<StorageObjectStorageQueue>(
std::move(queue_settings),
std::move(configuration),
args.table_id,
args.columns,
args.constraints,
args.comment,
args.getContext(),
format_settings,
args.storage_def,
args.mode);
}
#if USE_AWS_S3
void registerStorageS3Queue(StorageFactory & factory)
{
factory.registerStorage(
"S3Queue",
[](const StorageFactory::Arguments & args)
{
auto & engine_args = args.engine_args;
if (engine_args.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "External data source must have arguments");
auto configuration = std::make_shared<StorageS3Configuration>();
StorageObjectStorage::Configuration::initialize(*configuration, args.engine_args, args.getContext(), false);
// Use format settings from global server context + settings from
// the SETTINGS clause of the create query. Settings from current
// session and user are ignored.
std::optional<FormatSettings> format_settings;
auto queue_settings = std::make_unique<ObjectStorageQueueSettings>();
if (args.storage_def->settings)
{
queue_settings->loadFromQuery(*args.storage_def);
FormatFactorySettings user_format_settings;
// Apply changed settings from global context, but ignore the
// unknown ones, because we only have the format settings here.
const auto & changes = args.getContext()->getSettingsRef().changes();
for (const auto & change : changes)
{
if (user_format_settings.has(change.name))
user_format_settings.set(change.name, change.value);
args.storage_def->settings->changes.removeSetting(change.name);
}
for (const auto & change : args.storage_def->settings->changes)
{
if (user_format_settings.has(change.name))
user_format_settings.applyChange(change);
}
format_settings = getFormatSettings(args.getContext(), user_format_settings);
}
else
{
format_settings = getFormatSettings(args.getContext());
}
return std::make_shared<StorageObjectStorageQueue>(
std::move(queue_settings),
std::move(configuration),
args.table_id,
args.columns,
args.constraints,
args.comment,
args.getContext(),
format_settings,
args.storage_def,
args.mode);
return createQueueStorage<StorageS3Configuration>(args);
},
{
.supports_settings = true,
@ -82,6 +94,22 @@ void registerStorageS3Queue(StorageFactory & factory)
.source_access_type = AccessType::S3,
});
}
#endif
#if USE_AZURE_BLOB_STORAGE
void registerStorageAzureQueue(StorageFactory & factory)
{
factory.registerStorage(
"AzureQueue",
[](const StorageFactory::Arguments & args)
{
return createQueueStorage<StorageAzureConfiguration>(args);
},
{
.supports_settings = true,
.supports_schema_inference = true,
.source_access_type = AccessType::AZURE,
});
}
#endif
}

View File

@ -34,6 +34,7 @@ void registerStorageFuzzJSON(StorageFactory & factory);
void registerStorageS3(StorageFactory & factory);
void registerStorageHudi(StorageFactory & factory);
void registerStorageS3Queue(StorageFactory & factory);
void registerStorageAzureQueue(StorageFactory & factory);
#if USE_PARQUET
void registerStorageDeltaLake(StorageFactory & factory);
@ -126,6 +127,10 @@ void registerStorages()
registerStorageFuzzJSON(factory);
#endif
#if USE_AZURE_BLOB_STORAGE
registerStorageAzureQueue(factory);
#endif
#if USE_AWS_S3
registerStorageHudi(factory);
registerStorageS3Queue(factory);

View File

@ -12,6 +12,7 @@ import json
AVAILABLE_MODES = ["unordered", "ordered"]
DEFAULT_AUTH = ["'minio'", "'minio123'"]
NO_AUTH = ["NOSIGN"]
AZURE_CONTAINER_NAME = "cont"
def prepare_public_s3_bucket(started_cluster):
@ -84,6 +85,7 @@ def started_cluster():
"instance",
user_configs=["configs/users.xml"],
with_minio=True,
with_azurite=True,
with_zookeeper=True,
main_configs=[
"configs/zookeeper.xml",
@ -115,6 +117,9 @@ def started_cluster():
cluster.start()
logging.info("Cluster started")
container_client = cluster.blob_service_client.get_container_client(AZURE_CONTAINER_NAME)
container_client.create_container()
yield cluster
finally:
cluster.shutdown()
@ -134,6 +139,7 @@ def generate_random_files(
started_cluster,
files_path,
count,
storage = "s3",
column_num=3,
row_num=10,
start_ind=0,
@ -155,7 +161,10 @@ def generate_random_files(
values_csv = (
"\n".join((",".join(map(str, row)) for row in rand_values)) + "\n"
).encode()
put_s3_file_content(started_cluster, filename, values_csv, bucket)
if storage == "s3":
put_s3_file_content(started_cluster, filename, values_csv, bucket)
else:
put_azure_file_content(started_cluster, filename, values_csv, bucket)
return total_values
@ -164,6 +173,11 @@ def put_s3_file_content(started_cluster, filename, data, bucket=None):
buf = io.BytesIO(data)
started_cluster.minio_client.put_object(bucket, filename, buf, len(data))
def put_azure_file_content(started_cluster, filename, data, bucket=None):
client = started_cluster.blob_service_client.get_blob_client(AZURE_CONTAINER_NAME, filename)
buf = io.BytesIO(data)
client.upload_blob(buf, "BlockBlob", len(data))
def create_table(
started_cluster,
@ -171,6 +185,7 @@ def create_table(
table_name,
mode,
files_path,
engine_name = "S3Queue",
format="column1 UInt32, column2 UInt32, column3 UInt32",
additional_settings={},
file_format="CSV",
@ -189,11 +204,17 @@ def create_table(
}
settings.update(additional_settings)
url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/"
engine_def = None
if engine_name == "S3Queue":
url = f"http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/{files_path}/"
engine_def = f"{engine_name}('{url}', {auth_params}, {file_format})"
else:
engine_def = f"{engine_name}('{started_cluster.env_variables['AZURITE_CONNECTION_STRING']}', 'cont', '{files_path}/', 'CSV')"
node.query(f"DROP TABLE IF EXISTS {table_name}")
create_query = f"""
CREATE TABLE {table_name} ({format})
ENGINE = S3Queue('{url}', {auth_params}, {file_format})
ENGINE = {engine_def}
SETTINGS {",".join((k+"="+repr(v) for k, v in settings.items()))}
"""
@ -224,17 +245,29 @@ def create_mv(
)
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_delete_after_processing(started_cluster, mode):
@pytest.mark.parametrize(
"mode, engine_name",
[
pytest.param("unordered", "S3Queue"),
pytest.param("unordered", "AzureQueue"),
pytest.param("ordered", "S3Queue"),
pytest.param("ordered", "AzureQueue"),
],
)
def test_delete_after_processing(started_cluster, mode, engine_name):
node = started_cluster.instances["instance"]
table_name = f"test.delete_after_processing_{mode}"
table_name = f"test.delete_after_processing_{mode}_{engine_name}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
files_num = 5
row_num = 10
if engine_name == "S3Queue":
storage = "s3"
else:
storage = "azure"
total_values = generate_random_files(
started_cluster, files_path, files_num, row_num=row_num
started_cluster, files_path, files_num, row_num=row_num, storage = storage
)
create_table(
started_cluster,
@ -243,6 +276,7 @@ def test_delete_after_processing(started_cluster, mode):
mode,
files_path,
additional_settings={"after_processing": "delete"},
engine_name = engine_name,
)
create_mv(node, table_name, dst_table_name)
@ -263,15 +297,29 @@ def test_delete_after_processing(started_cluster, mode):
).splitlines()
] == sorted(total_values, key=lambda x: (x[0], x[1], x[2]))
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0
if engine_name == "S3Queue":
minio = started_cluster.minio_client
objects = list(minio.list_objects(started_cluster.minio_bucket, recursive=True))
assert len(objects) == 0
else:
client = started_cluster.blob_service_client.get_container_client(AZURE_CONTAINER_NAME)
objects_iterator = client.list_blobs(files_path)
for objects in objects_iterator:
assert False
@pytest.mark.parametrize("mode", AVAILABLE_MODES)
def test_failed_retry(started_cluster, mode):
@pytest.mark.parametrize(
"mode, engine_name",
[
pytest.param("unordered", "S3Queue"),
pytest.param("unordered", "AzureQueue"),
pytest.param("ordered", "S3Queue"),
pytest.param("ordered", "AzureQueue"),
],
)
def test_failed_retry(started_cluster, mode, engine_name):
node = started_cluster.instances["instance"]
table_name = f"test.failed_retry_{mode}"
table_name = f"test.failed_retry_{mode}_{engine_name}"
dst_table_name = f"{table_name}_dst"
files_path = f"{table_name}_data"
file_path = f"{files_path}/trash_test.csv"
@ -296,6 +344,7 @@ def test_failed_retry(started_cluster, mode):
"s3queue_loading_retries": retries_num,
"keeper_path": keeper_path,
},
engine_name = engine_name,
)
create_mv(node, table_name, dst_table_name)