Add separate config directive for Buffer profile

If you push data via Buffer engine then all your queries will be done
from one user, however this is not always desired behavior, since this
will not allow to limit queries with max_concurrent_queries_for_user and
similar.
This commit is contained in:
Azat Khuzhin 2021-01-27 21:05:18 +03:00
parent be831d09f7
commit 935870b2c2
5 changed files with 42 additions and 20 deletions

View File

@ -421,9 +421,15 @@
<!-- Comma-separated list of prefixes for user-defined settings. -->
<custom_settings_prefixes></custom_settings_prefixes>
<!-- System profile of settings. This settings are used by internal processes (Buffer storage, Distributed DDL worker and so on). -->
<!-- System profile of settings. This settings are used by internal processes (Distributed DDL worker and so on). -->
<!-- <system_profile>default</system_profile> -->
<!-- Buffer profile of settings.
This settings are used by Buffer storage to flush data to the underlying table.
Default: used from system_profile directive.
-->
<!-- <buffer_profile>default</buffer_profile> -->
<!-- Default database. -->
<default_database>default</default_database>

View File

@ -331,6 +331,7 @@ struct ContextShared
mutable std::optional<ExternalModelsLoader> external_models_loader;
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
AccessControlManager access_control_manager;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
@ -1297,6 +1298,13 @@ Context & Context::getGlobalContext()
return *global_context;
}
const Context & Context::getBufferContext() const
{
if (!buffer_context)
throw Exception("Logical error: there is no buffer context", ErrorCodes::LOGICAL_ERROR);
return *buffer_context;
}
const EmbeddedDictionaries & Context::getEmbeddedDictionaries() const
{
@ -2219,6 +2227,10 @@ void Context::setDefaultProfiles(const Poco::Util::AbstractConfiguration & confi
shared->system_profile_name = config.getString("system_profile", shared->default_profile_name);
setProfile(shared->system_profile_name);
shared->buffer_profile_name = config.getString("buffer_profile", shared->system_profile_name);
buffer_context = std::make_shared<Context>(*this);
buffer_context->setProfile(shared->buffer_profile_name);
}
String Context::getDefaultProfileName() const

View File

@ -254,6 +254,7 @@ private:
Context * query_context = nullptr;
Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
Context * global_context = nullptr; /// Global context. Could be equal to this.
std::shared_ptr<Context> buffer_context;/// Buffer context. Could be equal to this.
public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for
@ -542,6 +543,8 @@ public:
Context & getGlobalContext();
bool hasGlobalContext() const { return global_context != nullptr; }
const Context & getBufferContext() const;
void setQueryContext(Context & context_) { query_context = &context_; }
void setSessionContext(Context & context_) { session_context = &context_; }

View File

@ -72,14 +72,14 @@ StorageBuffer::StorageBuffer(
const StorageID & destination_id_,
bool allow_materialized_)
: IStorage(table_id_)
, global_context(context_.getGlobalContext())
, buffer_context(context_.getBufferContext())
, num_shards(num_shards_), buffers(num_shards_)
, min_thresholds(min_thresholds_)
, max_thresholds(max_thresholds_)
, destination_id(destination_id_)
, allow_materialized(allow_materialized_)
, log(&Poco::Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
, bg_pool(global_context.getBufferFlushSchedulePool())
, bg_pool(buffer_context.getBufferFlushSchedulePool())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -470,7 +470,7 @@ public:
StoragePtr destination;
if (storage.destination_id)
{
destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id, storage.global_context);
destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id, storage.buffer_context);
if (destination.get() == &storage)
throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
}
@ -586,9 +586,9 @@ bool StorageBuffer::mayBenefitFromIndexForIn(
void StorageBuffer::startup()
{
if (global_context.getSettingsRef().readonly)
if (buffer_context.getSettingsRef().readonly)
{
LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate system_profile to fix this.", getName());
LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate buffer_profile to fix this.", getName());
}
flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ backgroundFlush(); });
@ -605,7 +605,7 @@ void StorageBuffer::shutdown()
try
{
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, global_context);
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, buffer_context);
}
catch (...)
{
@ -646,6 +646,15 @@ bool StorageBuffer::optimize(
return true;
}
bool StorageBuffer::supportsPrewhere() const
{
if (!destination_id)
return false;
auto dest = DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context);
if (dest && dest.get() != this)
return dest->supportsPrewhere();
return false;
}
bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
{
@ -752,7 +761,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
Stopwatch watch;
try
{
writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, global_context));
writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context));
if (reset_block_structure)
buffer.data.clear();
}
@ -834,7 +843,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
for (const auto & column : block_to_write)
list_of_columns->children.push_back(std::make_shared<ASTIdentifier>(column.name));
auto insert_context = Context(global_context);
auto insert_context = Context(buffer_context);
insert_context.makeQueryContext();
InterpreterInsertQuery interpreter{insert, insert_context, allow_materialized};
@ -911,7 +920,7 @@ void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const S
std::optional<UInt64> StorageBuffer::totalRows(const Settings & settings) const
{
std::optional<UInt64> underlying_rows;
auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, global_context);
auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context);
if (underlying)
underlying_rows = underlying->totalRows(settings);

View File

@ -93,15 +93,7 @@ public:
const Context & context) override;
bool supportsSampling() const override { return true; }
bool supportsPrewhere() const override
{
if (!destination_id)
return false;
auto dest = DatabaseCatalog::instance().tryGetTable(destination_id, global_context);
if (dest && dest.get() != this)
return dest->supportsPrewhere();
return false;
}
bool supportsPrewhere() const override;
bool supportsFinal() const override { return true; }
bool supportsIndexForIn() const override { return true; }
@ -120,7 +112,7 @@ public:
private:
const Context & global_context;
const Context & buffer_context;
struct Buffer
{