From 20f1256a2fce9ea9604d58bb4065302af06422ec Mon Sep 17 00:00:00 2001 From: vdimir Date: Thu, 4 Jul 2024 18:29:09 +0000 Subject: [PATCH 01/12] Debuging 02956_rocksdb_bulk_sink --- src/Storages/RocksDB/EmbeddedRocksDBBulkSink.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/RocksDB/EmbeddedRocksDBBulkSink.cpp b/src/Storages/RocksDB/EmbeddedRocksDBBulkSink.cpp index 31812406d34..6f0f5f19970 100644 --- a/src/Storages/RocksDB/EmbeddedRocksDBBulkSink.cpp +++ b/src/Storages/RocksDB/EmbeddedRocksDBBulkSink.cpp @@ -199,9 +199,10 @@ void EmbeddedRocksDBBulkSink::consume(Chunk chunk_) if (chunks_to_write.empty()) return; + size_t num_chunks = chunks_to_write.size(); auto [serialized_key_column, serialized_value_column] = serializeChunks(std::move(chunks_to_write)); auto sst_file_path = getTemporarySSTFilePath(); - LOG_DEBUG(getLogger("EmbeddedRocksDBBulkSink"), "Writing {} rows to SST file {}", serialized_key_column->size(), sst_file_path); + LOG_DEBUG(getLogger("EmbeddedRocksDBBulkSink"), "Writing {} rows from {} chunks to SST file {}", serialized_key_column->size(), num_chunks, sst_file_path); if (auto status = buildSSTFile(sst_file_path, *serialized_key_column, *serialized_value_column); !status.ok()) throw Exception(ErrorCodes::ROCKSDB_ERROR, "RocksDB write error: {}", status.ToString()); From 7ab4af85e5d5bea114f1c74dd5cbe4b5a6176772 Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 8 Jul 2024 12:36:02 +0200 Subject: [PATCH 02/12] Delete flaky case from 02956_rocksdb_bulk_sink --- tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh index f7111d0afe2..4e6e123bba2 100755 --- a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh +++ b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh @@ -25,13 +25,6 @@ ${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 F ${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 1 ${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;" -# Testing insert with multiple sinks and fixed block size -${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;" -# Must set both max_threads and max_insert_threads to 2 to make sure there is only two sinks -${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000) SETTINGS max_threads = 2, max_insert_threads = 2, max_block_size = 10000, min_insert_block_size_rows = 0, min_insert_block_size_bytes = 0, insert_deduplication_token = '', optimize_trivial_insert_select = 1;" -${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 2 because default bulk sink size is ~1M rows / SST file -${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;" - # Testing insert with duplicated keys ${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;" ${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number % 1000, number+1 FROM numbers_mt(1000000) SETTINGS max_block_size = 100000, max_insert_threads = 1, optimize_trivial_insert_select = 1;" From 1e924768f95250acbfeed67841e40f7ec2fcf9c5 Mon Sep 17 00:00:00 2001 From: vdimir Date: Mon, 8 Jul 2024 16:14:51 +0200 Subject: [PATCH 03/12] fix 02956_rocksdb_bulk_sink.reference --- tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference index 74c71827e6e..2b887148ffb 100644 --- a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference +++ b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference @@ -3,7 +3,6 @@ 1000 1 1000 -2 1000000 1000 0 999001 From 7cf38826afa53deccee9aeb904d98bf98ae78d20 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Mon, 8 Jul 2024 17:48:49 +0200 Subject: [PATCH 04/12] Fix detection of number of CPUs in containers In the case when the 'parent' cgroup was used (i.e. name of cgroup was empty, which is common for containers) ClickHouse was ignoring the CPU limits set for the container. --- base/base/cgroupsv2.cpp | 17 ++++++++++------- base/base/cgroupsv2.h | 6 +++--- base/base/getMemoryAmount.cpp | 6 ++++-- src/Common/CgroupsMemoryUsageObserver.cpp | 6 ++++-- src/Common/getNumberOfPhysicalCPUCores.cpp | 10 +++++----- 5 files changed, 26 insertions(+), 19 deletions(-) diff --git a/base/base/cgroupsv2.cpp b/base/base/cgroupsv2.cpp index f20b9daf22e..466ebbc3ffb 100644 --- a/base/base/cgroupsv2.cpp +++ b/base/base/cgroupsv2.cpp @@ -33,8 +33,9 @@ bool cgroupsV2MemoryControllerEnabled() /// According to https://docs.kernel.org/admin-guide/cgroup-v2.html, file "cgroup.controllers" defines which controllers are available /// for the current + child cgroups. The set of available controllers can be restricted from level to level using file /// "cgroups.subtree_control". It is therefore sufficient to check the bottom-most nested "cgroup.controllers" file. - std::string cgroup = cgroupV2OfProcess(); - auto cgroup_dir = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup); + auto cgroup_dir = currentCGroupV2Path(); + if (cgroup_dir.empty()) + return false; std::ifstream controllers_file(cgroup_dir / "cgroup.controllers"); if (!controllers_file.is_open()) return false; @@ -46,7 +47,7 @@ bool cgroupsV2MemoryControllerEnabled() #endif } -std::string cgroupV2OfProcess() +std::filesystem::path currentCGroupV2Path() { #if defined(OS_LINUX) chassert(cgroupsV2Enabled()); @@ -54,17 +55,19 @@ std::string cgroupV2OfProcess() /// A simpler way to get the membership is: std::ifstream cgroup_name_file("/proc/self/cgroup"); if (!cgroup_name_file.is_open()) - return ""; + return {}; /// With cgroups v2, there will be a *single* line with prefix "0::/" /// (see https://docs.kernel.org/admin-guide/cgroup-v2.html) std::string cgroup; std::getline(cgroup_name_file, cgroup); static const std::string v2_prefix = "0::/"; if (!cgroup.starts_with(v2_prefix)) - return ""; + return {}; + + // the 'root' cgroup can have empty path, which is valid cgroup = cgroup.substr(v2_prefix.length()); - return cgroup; + return default_cgroups_mount / cgroup; #else - return ""; + return {}; #endif } diff --git a/base/base/cgroupsv2.h b/base/base/cgroupsv2.h index 70219d87cd1..2c58682ce31 100644 --- a/base/base/cgroupsv2.h +++ b/base/base/cgroupsv2.h @@ -16,7 +16,7 @@ bool cgroupsV2Enabled(); /// Assumes that cgroupsV2Enabled() is enabled. bool cgroupsV2MemoryControllerEnabled(); -/// Which cgroup does the process belong to? -/// Returns an empty string if the cgroup cannot be determined. +/// Detects which cgroup the process belong and returns the path to it in sysfs (for cgroups v2). +/// Returns an empty path if the cgroup cannot be determined. /// Assumes that cgroupsV2Enabled() is enabled. -std::string cgroupV2OfProcess(); +std::filesystem::path currentCGroupV2Path(); diff --git a/base/base/getMemoryAmount.cpp b/base/base/getMemoryAmount.cpp index f47cba9833d..9bd5ad75445 100644 --- a/base/base/getMemoryAmount.cpp +++ b/base/base/getMemoryAmount.cpp @@ -23,8 +23,10 @@ std::optional getCgroupsV2MemoryLimit() if (!cgroupsV2MemoryControllerEnabled()) return {}; - std::string cgroup = cgroupV2OfProcess(); - auto current_cgroup = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup); + auto current_cgroup = currentCGroupV2Path(); + + if (current_cgroup.empty()) + return {}; /// Open the bottom-most nested memory limit setting file. If there is no such file at the current /// level, try again at the parent level as memory settings are inherited. diff --git a/src/Common/CgroupsMemoryUsageObserver.cpp b/src/Common/CgroupsMemoryUsageObserver.cpp index d36c7fd08aa..e034319b21f 100644 --- a/src/Common/CgroupsMemoryUsageObserver.cpp +++ b/src/Common/CgroupsMemoryUsageObserver.cpp @@ -129,8 +129,10 @@ std::optional getCgroupsV2Path() if (!cgroupsV2MemoryControllerEnabled()) return {}; - String cgroup = cgroupV2OfProcess(); - auto current_cgroup = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup); + auto current_cgroup = currentCGroupV2Path(); + + if (current_cgroup.empty()) + return {}; /// Return the bottom-most nested current memory file. If there is no such file at the current /// level, try again at the parent level as memory settings are inherited. diff --git a/src/Common/getNumberOfPhysicalCPUCores.cpp b/src/Common/getNumberOfPhysicalCPUCores.cpp index 7e18a93e6ed..b16c635f23e 100644 --- a/src/Common/getNumberOfPhysicalCPUCores.cpp +++ b/src/Common/getNumberOfPhysicalCPUCores.cpp @@ -37,12 +37,12 @@ uint32_t getCGroupLimitedCPUCores(unsigned default_cpu_count) /// cgroupsv2 if (cgroupsV2Enabled()) { - /// First, we identify the cgroup the process belongs - std::string cgroup = cgroupV2OfProcess(); - if (cgroup.empty()) + /// First, we identify the path of the cgroup the process belongs + auto cgroup_path = currentCGroupV2Path(); + if (cgroup_path.empty()) return default_cpu_count; - auto current_cgroup = cgroup.empty() ? default_cgroups_mount : (default_cgroups_mount / cgroup); + auto current_cgroup = cgroup_path; // Looking for cpu.max in directories from the current cgroup to the top level // It does not stop on the first time since the child could have a greater value than parent @@ -62,7 +62,7 @@ uint32_t getCGroupLimitedCPUCores(unsigned default_cpu_count) } current_cgroup = current_cgroup.parent_path(); } - current_cgroup = default_cgroups_mount / cgroup; + current_cgroup = cgroup_path; // Looking for cpuset.cpus.effective in directories from the current cgroup to the top level while (current_cgroup != default_cgroups_mount.parent_path()) { From 859c63298ef044d92c585921579f90dd6e56deda Mon Sep 17 00:00:00 2001 From: vdimir Date: Tue, 9 Jul 2024 10:51:40 +0000 Subject: [PATCH 05/12] upd 02956_rocksdb_bulk_sink --- .../queries/0_stateless/02956_rocksdb_bulk_sink.reference | 1 + tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh | 7 +++++++ 2 files changed, 8 insertions(+) diff --git a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference index 2b887148ffb..1f140df1d6b 100644 --- a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference +++ b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.reference @@ -3,6 +3,7 @@ 1000 1 1000 +1 1000000 1000 0 999001 diff --git a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh index 4e6e123bba2..95c136584f0 100755 --- a/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh +++ b/tests/queries/0_stateless/02956_rocksdb_bulk_sink.sh @@ -25,6 +25,13 @@ ${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 F ${CLICKHOUSE_CLIENT} --query "SELECT sum(value) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be 1 ${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;" +# Testing insert with multiple sinks and fixed block size +${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;" +# Must set both max_threads and max_insert_threads to 2 to make sure there is only two sinks +${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number, number+1 FROM numbers_mt(1000000) SETTINGS max_threads = 2, max_insert_threads = 2, max_block_size = 10000, min_insert_block_size_rows = 0, min_insert_block_size_bytes = 0, insert_deduplication_token = '', optimize_trivial_insert_select = 1;" +${CLICKHOUSE_CLIENT} --query "SELECT sum(value) IN (1, 2) FROM system.rocksdb WHERE database = currentDatabase() AND table = 'rocksdb_worm' AND name = 'no.file.opens';" # should be not more than 2 because default bulk sink size is ~1M rows / SST file. +${CLICKHOUSE_CLIENT} --query "SELECT count() FROM rocksdb_worm;" + # Testing insert with duplicated keys ${CLICKHOUSE_CLIENT} --query "TRUNCATE TABLE rocksdb_worm;" ${CLICKHOUSE_CLIENT} --query "INSERT INTO rocksdb_worm SELECT number % 1000, number+1 FROM numbers_mt(1000000) SETTINGS max_block_size = 100000, max_insert_threads = 1, optimize_trivial_insert_select = 1;" From 7781067ba18ed4817770a804f4407c670dea5550 Mon Sep 17 00:00:00 2001 From: vdimir Date: Thu, 11 Jul 2024 16:53:38 +0000 Subject: [PATCH 06/12] Fix pushdown for join with external table Fix possible incorrect result for queries joining and filtering table external engine (like PostgreSQL), due to too agressive filter pushdown. Since now, conditions from where section won't be send to external database in case of outer join with external table. --- .../transformQueryForExternalDatabase.cpp | 2 +- ...nsformQueryForExternalDatabaseAnalyzer.cpp | 29 ++++++++-- ...ransformQueryForExternalDatabaseAnalyzer.h | 2 +- .../test_storage_postgresql/test.py | 54 +++++++++++++++++++ 4 files changed, 82 insertions(+), 5 deletions(-) diff --git a/src/Storages/transformQueryForExternalDatabase.cpp b/src/Storages/transformQueryForExternalDatabase.cpp index fc85bde11d9..7aac138296d 100644 --- a/src/Storages/transformQueryForExternalDatabase.cpp +++ b/src/Storages/transformQueryForExternalDatabase.cpp @@ -419,7 +419,7 @@ String transformQueryForExternalDatabase( throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "No column names for query '{}' to external table '{}.{}'", query_info.query_tree->formatASTForErrorMessage(), database, table); - auto clone_query = getASTForExternalDatabaseFromQueryTree(query_info.query_tree); + auto clone_query = getASTForExternalDatabaseFromQueryTree(query_info.query_tree, query_info.table_expression); return transformQueryForExternalDatabaseImpl( clone_query, diff --git a/src/Storages/transformQueryForExternalDatabaseAnalyzer.cpp b/src/Storages/transformQueryForExternalDatabaseAnalyzer.cpp index 5e0bfdd5f2a..dc1749b3196 100644 --- a/src/Storages/transformQueryForExternalDatabaseAnalyzer.cpp +++ b/src/Storages/transformQueryForExternalDatabaseAnalyzer.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -10,7 +11,7 @@ #include #include #include - +#include #include @@ -20,6 +21,7 @@ namespace DB namespace ErrorCodes { extern const int UNSUPPORTED_METHOD; + extern const int LOGICAL_ERROR; } namespace @@ -55,7 +57,7 @@ public: } -ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tree) +ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tree, const QueryTreeNodePtr & table_expression) { auto new_tree = query_tree->clone(); @@ -63,6 +65,21 @@ ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tre visitor.visit(new_tree); const auto * query_node = new_tree->as(); + const auto & join_tree = query_node->getJoinTree(); + bool allow_where = true; + if (const auto * join_node = join_tree->as()) + { + if (join_node->getStrictness() != JoinStrictness::All) + allow_where = false; + + if (join_node->getKind() == JoinKind::Left) + allow_where = join_node->getLeftTableExpression()->isEqual(*table_expression); + else if (join_node->getKind() == JoinKind::Right) + allow_where = join_node->getRightTableExpression()->isEqual(*table_expression); + else + allow_where = (join_node->getKind() == JoinKind::Inner); + } + auto query_node_ast = query_node->toAST({ .add_cast_for_constants = false, .fully_qualified_identifiers = false }); const IAST * ast = query_node_ast.get(); @@ -76,7 +93,13 @@ ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tre if (union_ast->list_of_selects->children.size() != 1) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "QueryNode AST is not a single ASTSelectQuery, got {}", union_ast->list_of_selects->children.size()); - return union_ast->list_of_selects->children.at(0); + ASTPtr select_query = union_ast->list_of_selects->children.at(0); + auto * select_query_typed = select_query->as(); + if (!select_query_typed) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected ASTSelectQuery, got {}", select_query ? select_query->formatForErrorMessage() : "nullptr"); + if (!allow_where) + select_query_typed->setExpression(ASTSelectQuery::Expression::WHERE, nullptr); + return select_query; } } diff --git a/src/Storages/transformQueryForExternalDatabaseAnalyzer.h b/src/Storages/transformQueryForExternalDatabaseAnalyzer.h index f8983619d1f..7d8bf99646b 100644 --- a/src/Storages/transformQueryForExternalDatabaseAnalyzer.h +++ b/src/Storages/transformQueryForExternalDatabaseAnalyzer.h @@ -6,6 +6,6 @@ namespace DB { -ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tree); +ASTPtr getASTForExternalDatabaseFromQueryTree(const QueryTreeNodePtr & query_tree, const QueryTreeNodePtr & table_expression); } diff --git a/tests/integration/test_storage_postgresql/test.py b/tests/integration/test_storage_postgresql/test.py index d9f3a9917ab..ffcff36c47e 100644 --- a/tests/integration/test_storage_postgresql/test.py +++ b/tests/integration/test_storage_postgresql/test.py @@ -834,6 +834,60 @@ def test_literal_escaping(started_cluster): cursor.execute(f"DROP TABLE escaping") +def test_filter_pushdown(started_cluster): + cursor = started_cluster.postgres_conn.cursor() + cursor.execute("CREATE SCHEMA test_filter_pushdown") + cursor.execute( + "CREATE TABLE test_filter_pushdown.test_table (id integer, value integer)" + ) + cursor.execute( + "INSERT INTO test_filter_pushdown.test_table VALUES (1, 10), (1, 110), (2, 0), (3, 33), (4, 0)" + ) + + node1.query( + """ + CREATE TABLE test_filter_pushdown_pg_table (id UInt32, value UInt32) + ENGINE PostgreSQL('postgres1:5432', 'postgres', 'test_table', 'postgres', 'mysecretpassword', 'test_filter_pushdown'); + """ + ) + + node1.query( + """ + CREATE TABLE test_filter_pushdown_local_table (id UInt32, value UInt32) ENGINE Memory AS SELECT * FROM test_filter_pushdown_pg_table + """ + ) + + node1.query( + "CREATE TABLE ch_table (id UInt32, pg_id UInt32) ENGINE MergeTree ORDER BY id" + ) + node1.query("INSERT INTO ch_table VALUES (1, 1), (2, 2), (3, 1), (4, 2), (5, 999)") + + def compare_results(query, **kwargs): + result1 = node1.query( + query.format(pg_table="test_filter_pushdown_pg_table", **kwargs) + ) + result2 = node1.query( + query.format(pg_table="test_filter_pushdown_local_table", **kwargs) + ) + assert result1 == result2 + + for kind in ["INNER", "LEFT", "RIGHT", "FULL"]: + for value in [0, 10]: + compare_results( + "SELECT * FROM ch_table {kind} JOIN {pg_table} as p ON ch_table.pg_id = p.id WHERE value = {value} ORDER BY ALL", + kind=kind, + value=value, + ) + + compare_results( + "SELECT * FROM {pg_table} as p {kind} JOIN ch_table ON ch_table.pg_id = p.id WHERE value = {value} ORDER BY ALL", + kind=kind, + value=value, + ) + + cursor.execute("DROP SCHEMA test_filter_pushdown CASCADE") + + if __name__ == "__main__": cluster.start() input("Cluster created, press any key to destroy...") From 2af0edd9e9a509e1bffa15e8da8454a4feb7f0ed Mon Sep 17 00:00:00 2001 From: Robert Schulze Date: Mon, 15 Jul 2024 08:47:08 +0000 Subject: [PATCH 07/12] Some minor fixups --- base/base/cgroupsv2.cpp | 14 +++++++------- base/base/cgroupsv2.h | 7 +++---- base/base/getMemoryAmount.cpp | 3 +-- src/Common/CgroupsMemoryUsageObserver.cpp | 12 ++++++------ src/Common/getNumberOfPhysicalCPUCores.cpp | 2 +- 5 files changed, 18 insertions(+), 20 deletions(-) diff --git a/base/base/cgroupsv2.cpp b/base/base/cgroupsv2.cpp index 466ebbc3ffb..87f62bf377d 100644 --- a/base/base/cgroupsv2.cpp +++ b/base/base/cgroupsv2.cpp @@ -3,8 +3,9 @@ #include #include -#include +#include +namespace fs = std::filesystem; bool cgroupsV2Enabled() { @@ -13,11 +14,11 @@ bool cgroupsV2Enabled() { /// This file exists iff the host has cgroups v2 enabled. auto controllers_file = default_cgroups_mount / "cgroup.controllers"; - if (!std::filesystem::exists(controllers_file)) + if (!fs::exists(controllers_file)) return false; return true; } - catch (const std::filesystem::filesystem_error &) /// all "underlying OS API errors", typically: permission denied + catch (const fs::filesystem_error &) /// all "underlying OS API errors", typically: permission denied { return false; /// not logging the exception as most callers fall back to cgroups v1 } @@ -33,7 +34,7 @@ bool cgroupsV2MemoryControllerEnabled() /// According to https://docs.kernel.org/admin-guide/cgroup-v2.html, file "cgroup.controllers" defines which controllers are available /// for the current + child cgroups. The set of available controllers can be restricted from level to level using file /// "cgroups.subtree_control". It is therefore sufficient to check the bottom-most nested "cgroup.controllers" file. - auto cgroup_dir = currentCGroupV2Path(); + fs::path cgroup_dir = cgroupV2PathOfProcess(); if (cgroup_dir.empty()) return false; std::ifstream controllers_file(cgroup_dir / "cgroup.controllers"); @@ -47,7 +48,7 @@ bool cgroupsV2MemoryControllerEnabled() #endif } -std::filesystem::path currentCGroupV2Path() +fs::path cgroupV2PathOfProcess() { #if defined(OS_LINUX) chassert(cgroupsV2Enabled()); @@ -63,9 +64,8 @@ std::filesystem::path currentCGroupV2Path() static const std::string v2_prefix = "0::/"; if (!cgroup.starts_with(v2_prefix)) return {}; - - // the 'root' cgroup can have empty path, which is valid cgroup = cgroup.substr(v2_prefix.length()); + /// Note: The 'root' cgroup can have an empty cgroup name, this is valid return default_cgroups_mount / cgroup; #else return {}; diff --git a/base/base/cgroupsv2.h b/base/base/cgroupsv2.h index 2c58682ce31..cfb916ff358 100644 --- a/base/base/cgroupsv2.h +++ b/base/base/cgroupsv2.h @@ -1,7 +1,6 @@ #pragma once #include -#include #if defined(OS_LINUX) /// I think it is possible to mount the cgroups hierarchy somewhere else (e.g. when in containers). @@ -16,7 +15,7 @@ bool cgroupsV2Enabled(); /// Assumes that cgroupsV2Enabled() is enabled. bool cgroupsV2MemoryControllerEnabled(); -/// Detects which cgroup the process belong and returns the path to it in sysfs (for cgroups v2). -/// Returns an empty path if the cgroup cannot be determined. +/// Detects which cgroup v2 the process belongs to and returns the filesystem path to the cgroup. +/// Returns an empty path the cgroup cannot be determined. /// Assumes that cgroupsV2Enabled() is enabled. -std::filesystem::path currentCGroupV2Path(); +std::filesystem::path cgroupV2PathOfProcess(); diff --git a/base/base/getMemoryAmount.cpp b/base/base/getMemoryAmount.cpp index 9bd5ad75445..afdb6ba068a 100644 --- a/base/base/getMemoryAmount.cpp +++ b/base/base/getMemoryAmount.cpp @@ -23,8 +23,7 @@ std::optional getCgroupsV2MemoryLimit() if (!cgroupsV2MemoryControllerEnabled()) return {}; - auto current_cgroup = currentCGroupV2Path(); - + std::filesystem::path current_cgroup = cgroupV2PathOfProcess(); if (current_cgroup.empty()) return {}; diff --git a/src/Common/CgroupsMemoryUsageObserver.cpp b/src/Common/CgroupsMemoryUsageObserver.cpp index e034319b21f..02bde0d80b7 100644 --- a/src/Common/CgroupsMemoryUsageObserver.cpp +++ b/src/Common/CgroupsMemoryUsageObserver.cpp @@ -25,6 +25,7 @@ #endif using namespace DB; +namespace fs = std::filesystem; namespace DB { @@ -69,7 +70,7 @@ uint64_t readMetricFromStatFile(ReadBufferFromFile & buf, const std::string & ke struct CgroupsV1Reader : ICgroupsReader { - explicit CgroupsV1Reader(const std::filesystem::path & stat_file_dir) : buf(stat_file_dir / "memory.stat") { } + explicit CgroupsV1Reader(const fs::path & stat_file_dir) : buf(stat_file_dir / "memory.stat") { } uint64_t readMemoryUsage() override { @@ -85,7 +86,7 @@ private: struct CgroupsV2Reader : ICgroupsReader { - explicit CgroupsV2Reader(const std::filesystem::path & stat_file_dir) + explicit CgroupsV2Reader(const fs::path & stat_file_dir) : current_buf(stat_file_dir / "memory.current"), stat_buf(stat_file_dir / "memory.stat") { } @@ -129,8 +130,7 @@ std::optional getCgroupsV2Path() if (!cgroupsV2MemoryControllerEnabled()) return {}; - auto current_cgroup = currentCGroupV2Path(); - + fs::path current_cgroup = cgroupV2PathOfProcess(); if (current_cgroup.empty()) return {}; @@ -140,7 +140,7 @@ std::optional getCgroupsV2Path() { const auto current_path = current_cgroup / "memory.current"; const auto stat_path = current_cgroup / "memory.stat"; - if (std::filesystem::exists(current_path) && std::filesystem::exists(stat_path)) + if (fs::exists(current_path) && fs::exists(stat_path)) return {current_cgroup}; current_cgroup = current_cgroup.parent_path(); } @@ -150,7 +150,7 @@ std::optional getCgroupsV2Path() std::optional getCgroupsV1Path() { auto path = default_cgroups_mount / "memory/memory.stat"; - if (!std::filesystem::exists(path)) + if (!fs::exists(path)) return {}; return {default_cgroups_mount / "memory"}; } diff --git a/src/Common/getNumberOfPhysicalCPUCores.cpp b/src/Common/getNumberOfPhysicalCPUCores.cpp index b16c635f23e..34a1add2f0e 100644 --- a/src/Common/getNumberOfPhysicalCPUCores.cpp +++ b/src/Common/getNumberOfPhysicalCPUCores.cpp @@ -38,7 +38,7 @@ uint32_t getCGroupLimitedCPUCores(unsigned default_cpu_count) if (cgroupsV2Enabled()) { /// First, we identify the path of the cgroup the process belongs - auto cgroup_path = currentCGroupV2Path(); + std::filesystem::path cgroup_path = cgroupV2PathOfProcess(); if (cgroup_path.empty()) return default_cpu_count; From e3d28f92688d63fd7d417553ad77944a380d3d30 Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 15 Jul 2024 15:12:23 +0200 Subject: [PATCH 08/12] Update 02443_detach_attach_partition.sh --- tests/queries/0_stateless/02443_detach_attach_partition.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/02443_detach_attach_partition.sh b/tests/queries/0_stateless/02443_detach_attach_partition.sh index d72d771a150..6a47b7d8d61 100755 --- a/tests/queries/0_stateless/02443_detach_attach_partition.sh +++ b/tests/queries/0_stateless/02443_detach_attach_partition.sh @@ -73,7 +73,7 @@ kill -TERM $PID_1 && kill -TERM $PID_2 && kill -TERM $PID_3 && kill -TERM $PID_4 wait $CLICKHOUSE_CLIENT -q "SELECT '$CLICKHOUSE_DATABASE', 'threads finished'" -wait_for_queries_to_finish +wait_for_queries_to_finish 60 $CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table0" $CLICKHOUSE_CLIENT -q "SYSTEM SYNC REPLICA alter_table1" From cfb2183d118b783d142a721c89c04b1835dfa3ed Mon Sep 17 00:00:00 2001 From: Alexander Tokmakov Date: Mon, 15 Jul 2024 17:25:43 +0200 Subject: [PATCH 09/12] Update 01396_inactive_replica_cleanup_nodes_zookeeper.sh --- .../01396_inactive_replica_cleanup_nodes_zookeeper.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes_zookeeper.sh b/tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes_zookeeper.sh index b81bb75891d..bff85b3e29f 100755 --- a/tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes_zookeeper.sh +++ b/tests/queries/0_stateless/01396_inactive_replica_cleanup_nodes_zookeeper.sh @@ -30,7 +30,7 @@ $CLICKHOUSE_CLIENT --max_block_size 1 --min_insert_block_size_rows 1 --min_inser for _ in {1..60}; do $CLICKHOUSE_CLIENT --query "SYSTEM FLUSH LOGS" - [[ $($CLICKHOUSE_CLIENT --query "SELECT sum(toUInt32(extract(message, 'Removed (\d+) old log entries'))) FROM system.text_log WHERE event_date >= yesterday() AND logger_name LIKE '%' || '$CLICKHOUSE_DATABASE' || '%r1%(ReplicatedMergeTreeCleanupThread)%' AND message LIKE '%Removed % old log entries%'") -gt $((SCALE - 100)) ]] && break; + [[ $($CLICKHOUSE_CLIENT --query "SELECT sum(toUInt32(extract(message, 'Removed (\d+) old log entries'))) FROM system.text_log WHERE event_date >= yesterday() AND logger_name LIKE '%' || '$CLICKHOUSE_DATABASE' || '%r1%(ReplicatedMergeTreeCleanupThread)%' AND message LIKE '%Removed % old log entries%'") -gt $((SCALE - 10)) ]] && break; sleep 1 done From c05b2bfd39bdb12290f2c698bfdbdec41021a45e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Tue, 16 Jul 2024 05:33:33 +0200 Subject: [PATCH 10/12] More clarity in the test `03001_consider_lwd_when_merge` --- tests/queries/0_stateless/03001_consider_lwd_when_merge.sql | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/03001_consider_lwd_when_merge.sql b/tests/queries/0_stateless/03001_consider_lwd_when_merge.sql index 988d7058f21..2b10c72ae1b 100644 --- a/tests/queries/0_stateless/03001_consider_lwd_when_merge.sql +++ b/tests/queries/0_stateless/03001_consider_lwd_when_merge.sql @@ -7,12 +7,14 @@ SETTINGS max_bytes_to_merge_at_max_space_in_pool = 80000, exclude_deleted_rows_f INSERT INTO lwd_merge SELECT number FROM numbers(10000); INSERT INTO lwd_merge SELECT number FROM numbers(10000, 10000); -OPTIMIZE TABLE lwd_merge; +SET optimize_throw_if_noop = 1; + +OPTIMIZE TABLE lwd_merge; -- { serverError CANNOT_ASSIGN_OPTIMIZE } SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1; DELETE FROM lwd_merge WHERE id % 10 > 0; -OPTIMIZE TABLE lwd_merge; +OPTIMIZE TABLE lwd_merge; -- { serverError CANNOT_ASSIGN_OPTIMIZE } SELECT count() FROM system.parts WHERE database = currentDatabase() AND table = 'lwd_merge' AND active = 1; ALTER TABLE lwd_merge MODIFY SETTING exclude_deleted_rows_for_part_size_in_merge = 1; From f3047cc78dc310e72b913964429f925876f27fd1 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Fri, 12 Jul 2024 10:35:16 +0800 Subject: [PATCH 11/12] fixed type mismatch in cross join --- src/Interpreters/HashJoin/HashJoin.cpp | 2 +- tests/queries/0_stateless/00202_cross_join.reference | 1 + tests/queries/0_stateless/00202_cross_join.sql | 2 ++ 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/HashJoin/HashJoin.cpp b/src/Interpreters/HashJoin/HashJoin.cpp index fa8ebd2c0f0..0c7cad4360d 100644 --- a/src/Interpreters/HashJoin/HashJoin.cpp +++ b/src/Interpreters/HashJoin/HashJoin.cpp @@ -125,7 +125,7 @@ HashJoin::HashJoin(std::shared_ptr table_join_, const Block & right_s if (isCrossOrComma(kind)) { data->type = Type::CROSS; - sample_block_with_columns_to_add = right_sample_block; + sample_block_with_columns_to_add = materializeBlock(right_sample_block); } else if (table_join->getClauses().empty()) { diff --git a/tests/queries/0_stateless/00202_cross_join.reference b/tests/queries/0_stateless/00202_cross_join.reference index a8db281730a..e134631383d 100644 --- a/tests/queries/0_stateless/00202_cross_join.reference +++ b/tests/queries/0_stateless/00202_cross_join.reference @@ -43,3 +43,4 @@ 2 2 2 3 2 4 +1 1 1 1 1 diff --git a/tests/queries/0_stateless/00202_cross_join.sql b/tests/queries/0_stateless/00202_cross_join.sql index 8d62c56b3f1..e4929d038c3 100644 --- a/tests/queries/0_stateless/00202_cross_join.sql +++ b/tests/queries/0_stateless/00202_cross_join.sql @@ -5,3 +5,5 @@ SELECT x, y FROM (SELECT number AS x FROM system.numbers LIMIT 3) js1 CROSS JOIN SET allow_experimental_analyzer = 1; SELECT x, y FROM (SELECT number AS x FROM system.numbers LIMIT 3) js1 CROSS JOIN (SELECT number AS y FROM system.numbers LIMIT 5) js2; + +SELECT * FROM ( SELECT 1 AS a, toLowCardinality(1), 1) AS t1 CROSS JOIN (SELECT toLowCardinality(1 AS a), 1 AS b) AS t2; From 9c1532e02f5f1dfeb2f3833afe585e26d08ccbf0 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 15 Jul 2024 09:46:51 +0800 Subject: [PATCH 12/12] add tests --- tests/queries/0_stateless/00202_cross_join.reference | 1 - tests/queries/0_stateless/00202_cross_join.sql | 2 -- .../queries/0_stateless/03205_column_type_check.reference | 2 ++ tests/queries/0_stateless/03205_column_type_check.sql | 7 +++++++ 4 files changed, 9 insertions(+), 3 deletions(-) create mode 100644 tests/queries/0_stateless/03205_column_type_check.reference create mode 100644 tests/queries/0_stateless/03205_column_type_check.sql diff --git a/tests/queries/0_stateless/00202_cross_join.reference b/tests/queries/0_stateless/00202_cross_join.reference index e134631383d..a8db281730a 100644 --- a/tests/queries/0_stateless/00202_cross_join.reference +++ b/tests/queries/0_stateless/00202_cross_join.reference @@ -43,4 +43,3 @@ 2 2 2 3 2 4 -1 1 1 1 1 diff --git a/tests/queries/0_stateless/00202_cross_join.sql b/tests/queries/0_stateless/00202_cross_join.sql index e4929d038c3..8d62c56b3f1 100644 --- a/tests/queries/0_stateless/00202_cross_join.sql +++ b/tests/queries/0_stateless/00202_cross_join.sql @@ -5,5 +5,3 @@ SELECT x, y FROM (SELECT number AS x FROM system.numbers LIMIT 3) js1 CROSS JOIN SET allow_experimental_analyzer = 1; SELECT x, y FROM (SELECT number AS x FROM system.numbers LIMIT 3) js1 CROSS JOIN (SELECT number AS y FROM system.numbers LIMIT 5) js2; - -SELECT * FROM ( SELECT 1 AS a, toLowCardinality(1), 1) AS t1 CROSS JOIN (SELECT toLowCardinality(1 AS a), 1 AS b) AS t2; diff --git a/tests/queries/0_stateless/03205_column_type_check.reference b/tests/queries/0_stateless/03205_column_type_check.reference new file mode 100644 index 00000000000..3b6c93a0610 --- /dev/null +++ b/tests/queries/0_stateless/03205_column_type_check.reference @@ -0,0 +1,2 @@ +1 nan 1048575 2 +1 1 1 1 1 diff --git a/tests/queries/0_stateless/03205_column_type_check.sql b/tests/queries/0_stateless/03205_column_type_check.sql new file mode 100644 index 00000000000..ab122821eb0 --- /dev/null +++ b/tests/queries/0_stateless/03205_column_type_check.sql @@ -0,0 +1,7 @@ +SELECT * FROM (SELECT toUInt256(1)) AS t, (SELECT greatCircleAngle(toLowCardinality(toNullable(toUInt256(1048575))), 257, -9223372036854775808, 1048576), 1048575, materialize(2)) AS u; + + +SET join_algorithm='hash'; +SET allow_experimental_join_condition=1; +SELECT * FROM ( SELECT 1 AS a, toLowCardinality(1), 1) AS t1 CROSS JOIN (SELECT toLowCardinality(1 AS a), 1 AS b) AS t2; +