From 935870b2c2b8cdc57ba64bb3006e80870acd2a0d Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Wed, 27 Jan 2021 21:05:18 +0300 Subject: [PATCH] Add separate config directive for Buffer profile If you push data via Buffer engine then all your queries will be done from one user, however this is not always desired behavior, since this will not allow to limit queries with max_concurrent_queries_for_user and similar. --- programs/server/config.xml | 8 +++++++- src/Interpreters/Context.cpp | 12 ++++++++++++ src/Interpreters/Context.h | 3 +++ src/Storages/StorageBuffer.cpp | 27 ++++++++++++++++++--------- src/Storages/StorageBuffer.h | 12 ++---------- 5 files changed, 42 insertions(+), 20 deletions(-) diff --git a/programs/server/config.xml b/programs/server/config.xml index 849d3dc32ba..ca57987d901 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -421,9 +421,15 @@ - + + + + default diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 5c99d39dc2e..eec71bbd92a 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -331,6 +331,7 @@ struct ContextShared mutable std::optional external_models_loader; String default_profile_name; /// Default profile name used for default values. String system_profile_name; /// Profile used by system processes + String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying AccessControlManager access_control_manager; mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks. mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files. @@ -1297,6 +1298,13 @@ Context & Context::getGlobalContext() return *global_context; } +const Context & Context::getBufferContext() const +{ + if (!buffer_context) + throw Exception("Logical error: there is no buffer context", ErrorCodes::LOGICAL_ERROR); + return *buffer_context; +} + const EmbeddedDictionaries & Context::getEmbeddedDictionaries() const { @@ -2219,6 +2227,10 @@ void Context::setDefaultProfiles(const Poco::Util::AbstractConfiguration & confi shared->system_profile_name = config.getString("system_profile", shared->default_profile_name); setProfile(shared->system_profile_name); + + shared->buffer_profile_name = config.getString("buffer_profile", shared->system_profile_name); + buffer_context = std::make_shared(*this); + buffer_context->setProfile(shared->buffer_profile_name); } String Context::getDefaultProfileName() const diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 98ca3909fea..909b27eaeaa 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -254,6 +254,7 @@ private: Context * query_context = nullptr; Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this. Context * global_context = nullptr; /// Global context. Could be equal to this. + std::shared_ptr buffer_context;/// Buffer context. Could be equal to this. public: // Top-level OpenTelemetry trace context for the query. Makes sense only for @@ -542,6 +543,8 @@ public: Context & getGlobalContext(); bool hasGlobalContext() const { return global_context != nullptr; } + const Context & getBufferContext() const; + void setQueryContext(Context & context_) { query_context = &context_; } void setSessionContext(Context & context_) { session_context = &context_; } diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index ce74567c62b..024ad7e001f 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -72,14 +72,14 @@ StorageBuffer::StorageBuffer( const StorageID & destination_id_, bool allow_materialized_) : IStorage(table_id_) - , global_context(context_.getGlobalContext()) + , buffer_context(context_.getBufferContext()) , num_shards(num_shards_), buffers(num_shards_) , min_thresholds(min_thresholds_) , max_thresholds(max_thresholds_) , destination_id(destination_id_) , allow_materialized(allow_materialized_) , log(&Poco::Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")")) - , bg_pool(global_context.getBufferFlushSchedulePool()) + , bg_pool(buffer_context.getBufferFlushSchedulePool()) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); @@ -470,7 +470,7 @@ public: StoragePtr destination; if (storage.destination_id) { - destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id, storage.global_context); + destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id, storage.buffer_context); if (destination.get() == &storage) throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP); } @@ -586,9 +586,9 @@ bool StorageBuffer::mayBenefitFromIndexForIn( void StorageBuffer::startup() { - if (global_context.getSettingsRef().readonly) + if (buffer_context.getSettingsRef().readonly) { - LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate system_profile to fix this.", getName()); + LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate buffer_profile to fix this.", getName()); } flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ backgroundFlush(); }); @@ -605,7 +605,7 @@ void StorageBuffer::shutdown() try { - optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, global_context); + optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, buffer_context); } catch (...) { @@ -646,6 +646,15 @@ bool StorageBuffer::optimize( return true; } +bool StorageBuffer::supportsPrewhere() const +{ + if (!destination_id) + return false; + auto dest = DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context); + if (dest && dest.get() != this) + return dest->supportsPrewhere(); + return false; +} bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const { @@ -752,7 +761,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc Stopwatch watch; try { - writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, global_context)); + writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context)); if (reset_block_structure) buffer.data.clear(); } @@ -834,7 +843,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl for (const auto & column : block_to_write) list_of_columns->children.push_back(std::make_shared(column.name)); - auto insert_context = Context(global_context); + auto insert_context = Context(buffer_context); insert_context.makeQueryContext(); InterpreterInsertQuery interpreter{insert, insert_context, allow_materialized}; @@ -911,7 +920,7 @@ void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const S std::optional StorageBuffer::totalRows(const Settings & settings) const { std::optional underlying_rows; - auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, global_context); + auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context); if (underlying) underlying_rows = underlying->totalRows(settings); diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h index 9656c78637b..46907ca196b 100644 --- a/src/Storages/StorageBuffer.h +++ b/src/Storages/StorageBuffer.h @@ -93,15 +93,7 @@ public: const Context & context) override; bool supportsSampling() const override { return true; } - bool supportsPrewhere() const override - { - if (!destination_id) - return false; - auto dest = DatabaseCatalog::instance().tryGetTable(destination_id, global_context); - if (dest && dest.get() != this) - return dest->supportsPrewhere(); - return false; - } + bool supportsPrewhere() const override; bool supportsFinal() const override { return true; } bool supportsIndexForIn() const override { return true; } @@ -120,7 +112,7 @@ public: private: - const Context & global_context; + const Context & buffer_context; struct Buffer {