diff --git a/programs/server/config.xml b/programs/server/config.xml
index 849d3dc32ba..ca57987d901 100644
--- a/programs/server/config.xml
+++ b/programs/server/config.xml
@@ -421,9 +421,15 @@
-
+
+
+
+
default
diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp
index 5c99d39dc2e..eec71bbd92a 100644
--- a/src/Interpreters/Context.cpp
+++ b/src/Interpreters/Context.cpp
@@ -331,6 +331,7 @@ struct ContextShared
mutable std::optional external_models_loader;
String default_profile_name; /// Default profile name used for default values.
String system_profile_name; /// Profile used by system processes
+ String buffer_profile_name; /// Profile used by Buffer engine for flushing to the underlying
AccessControlManager access_control_manager;
mutable UncompressedCachePtr uncompressed_cache; /// The cache of decompressed blocks.
mutable MarkCachePtr mark_cache; /// Cache of marks in compressed files.
@@ -1297,6 +1298,13 @@ Context & Context::getGlobalContext()
return *global_context;
}
+const Context & Context::getBufferContext() const
+{
+ if (!buffer_context)
+ throw Exception("Logical error: there is no buffer context", ErrorCodes::LOGICAL_ERROR);
+ return *buffer_context;
+}
+
const EmbeddedDictionaries & Context::getEmbeddedDictionaries() const
{
@@ -2219,6 +2227,10 @@ void Context::setDefaultProfiles(const Poco::Util::AbstractConfiguration & confi
shared->system_profile_name = config.getString("system_profile", shared->default_profile_name);
setProfile(shared->system_profile_name);
+
+ shared->buffer_profile_name = config.getString("buffer_profile", shared->system_profile_name);
+ buffer_context = std::make_shared(*this);
+ buffer_context->setProfile(shared->buffer_profile_name);
}
String Context::getDefaultProfileName() const
diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h
index 98ca3909fea..909b27eaeaa 100644
--- a/src/Interpreters/Context.h
+++ b/src/Interpreters/Context.h
@@ -254,6 +254,7 @@ private:
Context * query_context = nullptr;
Context * session_context = nullptr; /// Session context or nullptr. Could be equal to this.
Context * global_context = nullptr; /// Global context. Could be equal to this.
+ std::shared_ptr buffer_context;/// Buffer context. Could be equal to this.
public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for
@@ -542,6 +543,8 @@ public:
Context & getGlobalContext();
bool hasGlobalContext() const { return global_context != nullptr; }
+ const Context & getBufferContext() const;
+
void setQueryContext(Context & context_) { query_context = &context_; }
void setSessionContext(Context & context_) { session_context = &context_; }
diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp
index bf02a04c704..e28d5f4d6d1 100644
--- a/src/Storages/StorageBuffer.cpp
+++ b/src/Storages/StorageBuffer.cpp
@@ -72,14 +72,14 @@ StorageBuffer::StorageBuffer(
const StorageID & destination_id_,
bool allow_materialized_)
: IStorage(table_id_)
- , global_context(context_.getGlobalContext())
+ , buffer_context(context_.getBufferContext())
, num_shards(num_shards_), buffers(num_shards_)
, min_thresholds(min_thresholds_)
, max_thresholds(max_thresholds_)
, destination_id(destination_id_)
, allow_materialized(allow_materialized_)
, log(&Poco::Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
- , bg_pool(global_context.getBufferFlushSchedulePool())
+ , bg_pool(buffer_context.getBufferFlushSchedulePool())
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
@@ -475,7 +475,7 @@ public:
StoragePtr destination;
if (storage.destination_id)
{
- destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id, storage.global_context);
+ destination = DatabaseCatalog::instance().tryGetTable(storage.destination_id, storage.buffer_context);
if (destination.get() == &storage)
throw Exception("Destination table is myself. Write will cause infinite loop.", ErrorCodes::INFINITE_LOOP);
}
@@ -591,9 +591,9 @@ bool StorageBuffer::mayBenefitFromIndexForIn(
void StorageBuffer::startup()
{
- if (global_context.getSettingsRef().readonly)
+ if (buffer_context.getSettingsRef().readonly)
{
- LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate system_profile to fix this.", getName());
+ LOG_WARNING(log, "Storage {} is run with readonly settings, it will not be able to insert data. Set appropriate buffer_profile to fix this.", getName());
}
flush_handle = bg_pool.createTask(log->name() + "/Bg", [this]{ backgroundFlush(); });
@@ -610,7 +610,7 @@ void StorageBuffer::shutdown()
try
{
- optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, global_context);
+ optimize(nullptr /*query*/, getInMemoryMetadataPtr(), {} /*partition*/, false /*final*/, false /*deduplicate*/, {}, buffer_context);
}
catch (...)
{
@@ -651,6 +651,15 @@ bool StorageBuffer::optimize(
return true;
}
+bool StorageBuffer::supportsPrewhere() const
+{
+ if (!destination_id)
+ return false;
+ auto dest = DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context);
+ if (dest && dest.get() != this)
+ return dest->supportsPrewhere();
+ return false;
+}
bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
{
@@ -757,7 +766,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
Stopwatch watch;
try
{
- writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, global_context));
+ writeBlockToDestination(block_to_write, DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context));
if (reset_block_structure)
buffer.data.clear();
}
@@ -839,7 +848,7 @@ void StorageBuffer::writeBlockToDestination(const Block & block, StoragePtr tabl
for (const auto & column : block_to_write)
list_of_columns->children.push_back(std::make_shared(column.name));
- auto insert_context = Context(global_context);
+ auto insert_context = Context(buffer_context);
insert_context.makeQueryContext();
InterpreterInsertQuery interpreter{insert, insert_context, allow_materialized};
@@ -916,7 +925,7 @@ void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const S
std::optional StorageBuffer::totalRows(const Settings & settings) const
{
std::optional underlying_rows;
- auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, global_context);
+ auto underlying = DatabaseCatalog::instance().tryGetTable(destination_id, buffer_context);
if (underlying)
underlying_rows = underlying->totalRows(settings);
diff --git a/src/Storages/StorageBuffer.h b/src/Storages/StorageBuffer.h
index 9656c78637b..46907ca196b 100644
--- a/src/Storages/StorageBuffer.h
+++ b/src/Storages/StorageBuffer.h
@@ -93,15 +93,7 @@ public:
const Context & context) override;
bool supportsSampling() const override { return true; }
- bool supportsPrewhere() const override
- {
- if (!destination_id)
- return false;
- auto dest = DatabaseCatalog::instance().tryGetTable(destination_id, global_context);
- if (dest && dest.get() != this)
- return dest->supportsPrewhere();
- return false;
- }
+ bool supportsPrewhere() const override;
bool supportsFinal() const override { return true; }
bool supportsIndexForIn() const override { return true; }
@@ -120,7 +112,7 @@ public:
private:
- const Context & global_context;
+ const Context & buffer_context;
struct Buffer
{
diff --git a/tests/integration/test_buffer_profile/__init__.py b/tests/integration/test_buffer_profile/__init__.py
new file mode 100644
index 00000000000..e69de29bb2d
diff --git a/tests/integration/test_buffer_profile/configs/buffer_profile.xml b/tests/integration/test_buffer_profile/configs/buffer_profile.xml
new file mode 100644
index 00000000000..6ce6de70e63
--- /dev/null
+++ b/tests/integration/test_buffer_profile/configs/buffer_profile.xml
@@ -0,0 +1,3 @@
+
+ buffer_profile
+
diff --git a/tests/integration/test_buffer_profile/configs/users.d/buffer_profile.xml b/tests/integration/test_buffer_profile/configs/users.d/buffer_profile.xml
new file mode 100644
index 00000000000..2edd2b63dc6
--- /dev/null
+++ b/tests/integration/test_buffer_profile/configs/users.d/buffer_profile.xml
@@ -0,0 +1,8 @@
+
+
+
+ 1
+
+
+
+
diff --git a/tests/integration/test_buffer_profile/test.py b/tests/integration/test_buffer_profile/test.py
new file mode 100644
index 00000000000..ae9220898ab
--- /dev/null
+++ b/tests/integration/test_buffer_profile/test.py
@@ -0,0 +1,54 @@
+# pylint: disable=unused-argument
+# pylint: disable=redefined-outer-name
+# pylint: disable=line-too-long
+
+import pytest
+
+from helpers.cluster import ClickHouseCluster
+from helpers.client import QueryRuntimeException
+
+cluster = ClickHouseCluster(__file__)
+
+node_default = cluster.add_instance('node_default')
+node_buffer_profile = cluster.add_instance('node_buffer_profile',
+ main_configs=['configs/buffer_profile.xml'],
+ user_configs=['configs/users.d/buffer_profile.xml'])
+
+@pytest.fixture(scope='module', autouse=True)
+def start_cluster():
+ try:
+ cluster.start()
+ yield cluster
+ finally:
+ cluster.shutdown()
+
+def bootstrap(node):
+ node.query("""
+ CREATE TABLE data (key Int) Engine=MergeTree()
+ ORDER BY key
+ PARTITION BY key % 2;
+
+ CREATE TABLE buffer AS data Engine=Buffer(currentDatabase(), data,
+ /* settings for manual flush only */
+ 1, /* num_layers */
+ 10e6, /* min_time, placeholder */
+ 10e6, /* max_time, placeholder */
+ 0, /* min_rows */
+ 10e6, /* max_rows */
+ 0, /* min_bytes */
+ 80e6 /* max_bytes */
+ );
+
+ INSERT INTO buffer SELECT * FROM numbers(100);
+ """)
+
+def test_default_profile():
+ bootstrap(node_default)
+ # flush the buffer
+ node_default.query('OPTIMIZE TABLE buffer')
+
+def test_buffer_profile():
+ bootstrap(node_buffer_profile)
+ with pytest.raises(QueryRuntimeException, match='Too many partitions for single INSERT block'):
+ # flush the buffer
+ node_buffer_profile.query('OPTIMIZE TABLE buffer')