mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-15 12:14:18 +00:00
Support system.s3/azure_queue_log
This commit is contained in:
parent
3d95774197
commit
6a3d109444
@ -31,10 +31,10 @@ class ASTStorage;
|
||||
M(UInt32, cleanup_interval_min_ms, 60000, "For unordered mode. Polling backoff min for cleanup", 0) \
|
||||
M(UInt32, cleanup_interval_max_ms, 60000, "For unordered mode. Polling backoff max for cleanup", 0) \
|
||||
M(UInt32, buckets, 0, "Number of buckets for Ordered mode parallel processing", 0) \
|
||||
M(UInt32, max_processed_files_before_commit, 100, "Number of files which can be processed before being committed to keeper", 0) \
|
||||
M(UInt32, max_processed_rows_before_commit, 0, "Number of rows which can be processed before being committed to keeper", 0) \
|
||||
M(UInt32, max_processed_bytes_before_commit, 0, "Number of bytes which can be processed before being committed to keeper", 0) \
|
||||
M(UInt32, max_processing_time_sec_before_commit, 0, "Timeout in seconds after which to commit files committed to keeper", 0) \
|
||||
M(UInt64, max_processed_files_before_commit, 100, "Number of files which can be processed before being committed to keeper", 0) \
|
||||
M(UInt64, max_processed_rows_before_commit, 0, "Number of rows which can be processed before being committed to keeper", 0) \
|
||||
M(UInt64, max_processed_bytes_before_commit, 0, "Number of bytes which can be processed before being committed to keeper", 0) \
|
||||
M(UInt64, max_processing_time_sec_before_commit, 0, "Timeout in seconds after which to commit files committed to keeper", 0) \
|
||||
|
||||
#define LIST_OF_OBJECT_STORAGE_QUEUE_SETTINGS(M, ALIAS) \
|
||||
OBJECT_STORAGE_QUEUE_RELATED_SETTINGS(M, ALIAS) \
|
||||
|
@ -22,13 +22,13 @@ struct ObjectStorageQueueTableMetadata
|
||||
const String columns;
|
||||
const String after_processing;
|
||||
const String mode;
|
||||
const UInt64 tracked_files_limit;
|
||||
const UInt64 tracked_files_ttl_sec;
|
||||
const UInt64 buckets;
|
||||
const UInt32 tracked_files_limit;
|
||||
const UInt32 tracked_files_ttl_sec;
|
||||
const UInt32 buckets;
|
||||
const String last_processed_path;
|
||||
const UInt64 loading_retries;
|
||||
const UInt32 loading_retries;
|
||||
|
||||
UInt64 processing_threads_num; /// Can be changed from keeper.
|
||||
UInt32 processing_threads_num; /// Can be changed from keeper.
|
||||
bool processing_threads_num_changed = false;
|
||||
|
||||
ObjectStorageQueueTableMetadata(
|
||||
|
@ -127,7 +127,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
|
||||
const String & comment,
|
||||
ContextPtr context_,
|
||||
std::optional<FormatSettings> format_settings_,
|
||||
ASTStorage * /* engine_args */,
|
||||
ASTStorage * engine_args,
|
||||
LoadingStrictnessLevel mode)
|
||||
: IStorage(table_id_)
|
||||
, WithContext(context_)
|
||||
@ -176,6 +176,7 @@ StorageObjectStorageQueue::StorageObjectStorageQueue(
|
||||
storage_metadata.setColumns(columns);
|
||||
storage_metadata.setConstraints(constraints_);
|
||||
storage_metadata.setComment(comment);
|
||||
storage_metadata.settings_changes = engine_args->settings->ptr();
|
||||
setVirtuals(VirtualColumnUtils::getVirtualsForFileLikeStorage(storage_metadata.columns, context_));
|
||||
setInMemoryMetadata(storage_metadata);
|
||||
|
||||
@ -415,7 +416,7 @@ void StorageObjectStorageQueue::threadFunc()
|
||||
else
|
||||
{
|
||||
/// Increase the reschedule interval.
|
||||
reschedule_processing_interval_ms = std::min(polling_max_timeout_ms, reschedule_processing_interval_ms + polling_backoff_ms);
|
||||
reschedule_processing_interval_ms = std::min<size_t>(polling_max_timeout_ms, reschedule_processing_interval_ms + polling_backoff_ms);
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Stopped streaming to {} attached views", dependencies_count);
|
||||
@ -542,4 +543,31 @@ std::shared_ptr<StorageObjectStorageQueue::FileIterator> StorageObjectStorageQue
|
||||
return std::make_shared<FileIterator>(files_metadata, std::move(glob_iterator), shutdown_called, log);
|
||||
}
|
||||
|
||||
ObjectStorageQueueSettings StorageObjectStorageQueue::getSettings() const
|
||||
{
|
||||
/// We do not store queue settings
|
||||
/// (because of the inconbenience of keeping them in sync with ObjectStorageQueueTableMetadata),
|
||||
/// so let's reconstruct.
|
||||
ObjectStorageQueueSettings settings;
|
||||
const auto & table_metadata = getTableMetadata();
|
||||
settings.after_processing = table_metadata.after_processing;
|
||||
settings.keeper_path = zk_path;
|
||||
settings.loading_retries = table_metadata.loading_retries;
|
||||
settings.processing_threads_num = table_metadata.processing_threads_num;
|
||||
settings.enable_logging_to_queue_log = enable_logging_to_queue_log;
|
||||
settings.last_processed_path = table_metadata.last_processed_path;
|
||||
settings.tracked_file_ttl_sec = 0;
|
||||
settings.tracked_files_limit = 0;
|
||||
settings.polling_min_timeout_ms = polling_min_timeout_ms;
|
||||
settings.polling_max_timeout_ms = polling_max_timeout_ms;
|
||||
settings.polling_backoff_ms = polling_backoff_ms;
|
||||
settings.cleanup_interval_min_ms = 0;
|
||||
settings.cleanup_interval_max_ms = 0;
|
||||
settings.buckets = table_metadata.buckets;
|
||||
settings.max_processed_files_before_commit = commit_settings.max_processed_files_before_commit;
|
||||
settings.max_processed_rows_before_commit = commit_settings.max_processed_rows_before_commit;
|
||||
settings.max_processed_bytes_before_commit = commit_settings.max_processed_bytes_before_commit;
|
||||
return settings;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -51,6 +51,8 @@ public:
|
||||
|
||||
zkutil::ZooKeeperPtr getZooKeeper() const;
|
||||
|
||||
ObjectStorageQueueSettings getSettings() const;
|
||||
|
||||
private:
|
||||
friend class ReadFromObjectStorageQueue;
|
||||
using FileIterator = ObjectStorageQueueSource::FileIterator;
|
||||
@ -58,9 +60,9 @@ private:
|
||||
|
||||
const fs::path zk_path;
|
||||
const bool enable_logging_to_queue_log;
|
||||
const size_t polling_min_timeout_ms;
|
||||
const size_t polling_max_timeout_ms;
|
||||
const size_t polling_backoff_ms;
|
||||
const UInt32 polling_min_timeout_ms;
|
||||
const UInt32 polling_max_timeout_ms;
|
||||
const UInt32 polling_backoff_ms;
|
||||
const CommitSettings commit_settings;
|
||||
|
||||
std::shared_ptr<ObjectStorageQueueMetadata> files_metadata;
|
||||
|
@ -0,0 +1,92 @@
|
||||
#include <Core/Settings.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <Parsers/ASTSetQuery.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Access/ContextAccess.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Storages/System/StorageSystemObjectStorageQueueSettings.h>
|
||||
#include <Storages/ObjectStorageQueue/StorageObjectStorageQueue.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
template <StorageObjectStorageQueueType type>
|
||||
ColumnsDescription StorageSystemObjectStorageQueueSettings<type>::getColumnsDescription()
|
||||
{
|
||||
return ColumnsDescription
|
||||
{
|
||||
{"database", std::make_shared<DataTypeString>(), "Database of the table with S3Queue Engine."},
|
||||
{"table", std::make_shared<DataTypeString>(), "Name of the table with S3Queue Engine."},
|
||||
{"name", std::make_shared<DataTypeString>(), "Setting name."},
|
||||
{"value", std::make_shared<DataTypeString>(), "Setting value."},
|
||||
{"type", std::make_shared<DataTypeString>(), "Setting type (implementation specific string value)."},
|
||||
{"changed", std::make_shared<DataTypeUInt8>(), "1 if the setting was explicitly defined in the config or explicitly changed."},
|
||||
{"description", std::make_shared<DataTypeString>(), "Setting description."},
|
||||
{"alterable", std::make_shared<DataTypeUInt8>(),
|
||||
"Shows whether the current user can change the setting via ALTER TABLE MODIFY SETTING: "
|
||||
"0 — Current user can change the setting, "
|
||||
"1 — Current user can't change the setting."
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
template <StorageObjectStorageQueueType type>
|
||||
void StorageSystemObjectStorageQueueSettings<type>::fillData(
|
||||
MutableColumns & res_columns,
|
||||
ContextPtr context,
|
||||
const ActionsDAG::Node *,
|
||||
std::vector<UInt8>) const
|
||||
{
|
||||
auto add_table = [&](
|
||||
const DatabaseTablesIteratorPtr & it, StorageObjectStorageQueue & storage)
|
||||
{
|
||||
/// We cannot use setting.isValueChanged(), because we do not store initial settings in storage.
|
||||
/// Therefore check if the setting was changed via table metadata.
|
||||
const auto & settings_changes = storage.getInMemoryMetadataPtr()->settings_changes->as<ASTSetQuery>()->changes;
|
||||
auto is_changed = [&](const std::string & setting_name) -> bool
|
||||
{
|
||||
return settings_changes.end() != std::find_if(
|
||||
settings_changes.begin(), settings_changes.end(),
|
||||
[&](const SettingChange & change){ return change.name == setting_name; });
|
||||
};
|
||||
|
||||
for (const auto & change : storage.getSettings())
|
||||
{
|
||||
size_t i = 0;
|
||||
res_columns[i++]->insert(it->databaseName());
|
||||
res_columns[i++]->insert(it->name());
|
||||
res_columns[i++]->insert(change.getName());
|
||||
res_columns[i++]->insert(convertFieldToString(change.getValue()));
|
||||
res_columns[i++]->insert(change.getTypeName());
|
||||
res_columns[i++]->insert(is_changed(change.getName()));
|
||||
res_columns[i++]->insert(change.getDescription());
|
||||
res_columns[i++]->insert(false);
|
||||
}
|
||||
};
|
||||
|
||||
const auto access = context->getAccess();
|
||||
const bool show_tables_granted = access->isGranted(AccessType::SHOW_TABLES);
|
||||
if (show_tables_granted)
|
||||
{
|
||||
auto databases = DatabaseCatalog::instance().getDatabases();
|
||||
for (const auto & db : databases)
|
||||
{
|
||||
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
|
||||
{
|
||||
StoragePtr storage = iterator->table();
|
||||
if (auto * queue_table = dynamic_cast<StorageObjectStorageQueue *>(storage.get()))
|
||||
{
|
||||
add_table(iterator, *queue_table);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
template class StorageSystemObjectStorageQueueSettings<StorageObjectStorageQueueType::S3>;
|
||||
template class StorageSystemObjectStorageQueueSettings<StorageObjectStorageQueueType::Azure>;
|
||||
}
|
@ -0,0 +1,35 @@
|
||||
#pragma once
|
||||
#include <Storages/System/IStorageSystemOneBlock.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
|
||||
enum class StorageObjectStorageQueueType
|
||||
{
|
||||
S3,
|
||||
Azure,
|
||||
};
|
||||
|
||||
template <StorageObjectStorageQueueType type>
|
||||
class StorageSystemObjectStorageQueueSettings final : public IStorageSystemOneBlock
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = type == StorageObjectStorageQueueType::S3 ? "SystemS3QueueSettings" : "SystemAzureQueueSettings";
|
||||
|
||||
std::string getName() const override { return name; }
|
||||
|
||||
static ColumnsDescription getColumnsDescription();
|
||||
|
||||
protected:
|
||||
using IStorageSystemOneBlock::IStorageSystemOneBlock;
|
||||
|
||||
void fillData(
|
||||
MutableColumns & res_columns,
|
||||
ContextPtr context,
|
||||
const ActionsDAG::Node *,
|
||||
std::vector<UInt8>) const override;
|
||||
};
|
||||
|
||||
}
|
@ -92,6 +92,7 @@
|
||||
#include <Storages/System/StorageSystemJemalloc.h>
|
||||
#include <Storages/System/StorageSystemScheduler.h>
|
||||
#include <Storages/System/StorageSystemS3Queue.h>
|
||||
#include <Storages/System/StorageSystemObjectStorageQueueSettings.h>
|
||||
#include <Storages/System/StorageSystemDashboards.h>
|
||||
#include <Storages/System/StorageSystemViewRefreshes.h>
|
||||
#include <Storages/System/StorageSystemDNSCache.h>
|
||||
@ -227,6 +228,8 @@ void attachSystemTablesServer(ContextPtr context, IDatabase & system_database, b
|
||||
attach<StorageSystemUserProcesses>(context, system_database, "user_processes", "This system table can be used to get overview of memory usage and ProfileEvents of users.");
|
||||
attachNoDescription<StorageSystemJemallocBins>(context, system_database, "jemalloc_bins", "Contains information about memory allocations done via jemalloc allocator in different size classes (bins) aggregated from all arenas. These statistics might not be absolutely accurate because of thread local caching in jemalloc.");
|
||||
attachNoDescription<StorageSystemS3Queue>(context, system_database, "s3queue", "Contains in-memory state of S3Queue metadata and currently processed rows per file.");
|
||||
attach<StorageSystemObjectStorageQueueSettings<StorageObjectStorageQueueType::S3>>(context, system_database, "s3_queue_settings", "Contains a list of settings of S3Queue tables.");
|
||||
attach<StorageSystemObjectStorageQueueSettings<StorageObjectStorageQueueType::Azure>>(context, system_database, "azure_queue_settings", "Contains a list of settings of AzureQueue tables.");
|
||||
attach<StorageSystemDashboards>(context, system_database, "dashboards", "Contains queries used by /dashboard page accessible though HTTP interface. This table can be useful for monitoring and troubleshooting. The table contains a row for every chart in a dashboard.");
|
||||
attach<StorageSystemViewRefreshes>(context, system_database, "view_refreshes", "Lists all Refreshable Materialized Views of current server.");
|
||||
|
||||
|
@ -2073,6 +2073,10 @@ def test_processing_threads(started_cluster):
|
||||
f"SELECT * FROM system.zookeeper WHERE path = '{keeper_path}'"
|
||||
)
|
||||
|
||||
assert 32 == int(node.query(
|
||||
f"SELECT value FROM system.s3_queue_settings WHERE table = '{table_name}' and name = 'processing_threads_num'"
|
||||
))
|
||||
|
||||
total_values = generate_random_files(
|
||||
started_cluster, files_path, files_to_generate, start_ind=0, row_num=1
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user