Merge pull request #19721 from azat/buffer-profile

Add separate config directive for Buffer profile
This commit is contained in:
alexey-milovidov 2021-02-13 02:43:41 +03:00 committed by GitHub
commit 2fe2190a20
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 107 additions and 20 deletions

View File

@ -421,9 +421,15 @@
<!-- Comma-separated list of prefixes for user-defined settings. -->
<custom_settings_prefixes></custom_settings_prefixes>
<!-- System profile of settings. This settings are used by internal processes (Buffer storage, Distributed DDL worker and so on). -->
<!-- System profile of settings. This settings are used by internal processes (Distributed DDL worker and so on). -->
<!-- <system_profile>default</system_profile> -->
<!-- Buffer profile of settings.
This settings are used by Buffer storage to flush data to the underlying table.
Default: used from system_profile directive.
-->
<!-- <buffer_profile>default</buffer_profile> -->
<!-- Default database. -->
<default_database>default</default_database>

View File

@ -331,6 +331,7 @@ struct ContextShared
mutable std::optional<ExternalModelsLoader> external_models_loader;
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
AccessControlManager access_control_manager;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
@ -1297,6 +1298,13 @@ Context & Context::getGlobalContext()
return *global_context;
}
const Context & Context::getBufferContext() const
{
if (!buffer_context)
throw Exception("Logical error: there is no buffer context", ErrorCodes::LOGICAL_ERROR);
return *buffer_context;
}
const EmbeddedDictionaries & Context::getEmbeddedDictionaries() const
{
@ -2219,6 +2227,10 @@ void Context::setDefaultProfiles(const Poco::Util::AbstractConfiguration & confi
shared->system_profile_name = config.getString("system_profile", shared->default_profile_name);
setProfile(shared->system_profile_name);
shared->buffer_profile_name = config.getString("buffer_profile", shared->system_profile_name);
buffer_context = std::make_shared<Context>(*this);
buffer_context->setProfile(shared->buffer_profile_name);
}
String Context::getDefaultProfileName() const

View File

@ -254,6 +254,7 @@ private:
Context * query_context = nullptr;
Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
Context * global_context = nullptr; /// Global context. Could be equal to this.
std::shared_ptr<Context> buffer_context;/// Buffer context. Could be equal to this.
public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for
@ -542,6 +543,8 @@ public:
Context & getGlobalContext();
bool hasGlobalContext() const { return global_context != nullptr; }
const Context & getBufferContext() const;
void setQueryContext(Context & context_) { query_context = &context_; }
void setSessionContext(Context & context_) { session_context = &context_; }

View File

@ -72,14 +72,14 @@ StorageBuffer::StorageBuffer(
const StorageID & destination_id_,
bool allow_materialized_)
: IStorage(table_id_)
, global_context(context_.getGlobalContext())
, buffer_context(context_.getBufferContext())
, num_shards(num_shards_), buffers(num_shards_)
, min_thresholds(min_thresholds_)
, max_thresholds(max_thresholds_)
, destination_id(destination_id_)
, allow_materialized(allow_materialized_)
, log(&Poco::Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
, bg_pool(global_context.getBufferFlushSchedulePool())
, bg_pool(buffer_context.getBufferFlushSchedulePool())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@ -475,7 +475,7 @@ public:
StoragePtr destination;
if (storage.destination_id)
{
destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id, storage.global_context);
destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id, storage.buffer_context);
if (destination.get() == &storage)
throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
}
@ -591,9 +591,9 @@ bool StorageBuffer::mayBenefitFromIndexForIn(
void StorageBuffer::startup()
{
if (global_context.getSettingsRef().readonly)
if (buffer_context.getSettingsRef().readonly)
{
LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate system_profile to fix this.", getName());
LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate buffer_profile to fix this.", getName());
}
flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ backgroundFlush(); });
@ -610,7 +610,7 @@ void StorageBuffer::shutdown()
try
{
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, global_context);
optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, buffer_context);
}
catch (...)
{
@ -651,6 +651,15 @@ bool StorageBuffer::optimize(
return true;
}
bool StorageBuffer::supportsPrewhere() const
{
if (!destination_id)
return false;
auto dest = DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context);
if (dest && dest.get() != this)
return dest->supportsPrewhere();
return false;
}
bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
{
@ -757,7 +766,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
Stopwatch watch;
try
{
writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, global_context));
writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context));
if (reset_block_structure)
buffer.data.clear();
}
@ -839,7 +848,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
for (const auto & column : block_to_write)
list_of_columns->children.push_back(std::make_shared<ASTIdentifier>(column.name));
auto insert_context = Context(global_context);
auto insert_context = Context(buffer_context);
insert_context.makeQueryContext();
InterpreterInsertQuery interpreter{insert, insert_context, allow_materialized};
@ -916,7 +925,7 @@ void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const S
std::optional<UInt64> StorageBuffer::totalRows(const Settings & settings) const
{
std::optional<UInt64> underlying_rows;
auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, global_context);
auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context);
if (underlying)
underlying_rows = underlying->totalRows(settings);

View File

@ -93,15 +93,7 @@ public:
const Context & context) override;
bool supportsSampling() const override { return true; }
bool supportsPrewhere() const override
{
if (!destination_id)
return false;
auto dest = DatabaseCatalog::instance().tryGetTable(destination_id, global_context);
if (dest && dest.get() != this)
return dest->supportsPrewhere();
return false;
}
bool supportsPrewhere() const override;
bool supportsFinal() const override { return true; }
bool supportsIndexForIn() const override { return true; }
@ -120,7 +112,7 @@ public:
private:
const Context & global_context;
const Context & buffer_context;
struct Buffer
{

View File

@ -0,0 +1,3 @@
<yandex>
<buffer_profile>buffer_profile</buffer_profile>
</yandex>

View File

@ -0,0 +1,8 @@
<yandex>
<profiles>
<buffer_profile>
<max_partitions_per_insert_block>1</max_partitions_per_insert_block>
</buffer_profile>
</profiles>
</yandex>

View File

@ -0,0 +1,54 @@
# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
cluster = ClickHouseCluster(__file__)
node_default = cluster.add_instance('node_default')
node_buffer_profile = cluster.add_instance('node_buffer_profile',
main_configs=['configs/buffer_profile.xml'],
user_configs=['configs/users.d/buffer_profile.xml'])
@pytest.fixture(scope='module', autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def bootstrap(node):
node.query("""
CREATE TABLE data (key Int) Engine=MergeTree()
ORDER BY key
PARTITION BY key % 2;
CREATE TABLE buffer AS data Engine=Buffer(currentDatabase(), data,
/* settings for manual flush only */
1, /* num_layers */
10e6, /* min_time, placeholder */
10e6, /* max_time, placeholder */
0, /* min_rows */
10e6, /* max_rows */
0, /* min_bytes */
80e6 /* max_bytes */
);
INSERT INTO buffer SELECT * FROM numbers(100);
""")
def test_default_profile():
bootstrap(node_default)
# flush the buffer
node_default.query('OPTIMIZE TABLE buffer')
def test_buffer_profile():
bootstrap(node_buffer_profile)
with pytest.raises(QueryRuntimeException, match='Too many partitions for single INSERT block'):
# flush the buffer
node_buffer_profile.query('OPTIMIZE TABLE buffer')