diff --git a/docs/en/operations/system-tables/table_engines.md b/docs/en/operations/system-tables/table_engines.md index 4ca1fc657ee..30122cb133e 100644 --- a/docs/en/operations/system-tables/table_engines.md +++ b/docs/en/operations/system-tables/table_engines.md @@ -11,6 +11,7 @@ This table contains the following columns (the column type is shown in brackets) - `supports_sort_order` (UInt8) — Flag that indicates if table engine supports clauses `PARTITION_BY`, `PRIMARY_KEY`, `ORDER_BY` and `SAMPLE_BY`. - `supports_replication` (UInt8) — Flag that indicates if table engine supports [data replication](../../engines/table-engines/mergetree-family/replication.md). - `supports_duduplication` (UInt8) — Flag that indicates if table engine supports data deduplication. +- `supports_parallel_insert` (UInt8) — Flag that indicates if table engine supports parallel insert (see [`max_insert_threads`](../../operations/settings/settings.md#settings-max-insert-threads) setting). Example: @@ -21,11 +22,11 @@ WHERE name in ('Kafka', 'MergeTree', 'ReplicatedCollapsingMergeTree') ``` ``` text -┌─name──────────────────────────┬─supports_settings─┬─supports_skipping_indices─┬─supports_sort_order─┬─supports_ttl─┬─supports_replication─┬─supports_deduplication─┐ -│ Kafka │ 1 │ 0 │ 0 │ 0 │ 0 │ 0 │ -│ MergeTree │ 1 │ 1 │ 1 │ 1 │ 0 │ 0 │ -│ ReplicatedCollapsingMergeTree │ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │ -└───────────────────────────────┴───────────────────┴───────────────────────────┴─────────────────────┴──────────────┴──────────────────────┴────────────────────────┘ +┌─name──────────────────────────┬─supports_settings─┬─supports_skipping_indices─┬─supports_sort_order─┬─supports_ttl─┬─supports_replication─┬─supports_deduplication─┬─supports_parallel_insert─┐ +│ MergeTree │ 1 │ 1 │ 1 │ 1 │ 0 │ 0 │ 1 │ +│ Kafka │ 1 │ 0 │ 0 │ 0 │ 0 │ 0 │ 0 │ +│ ReplicatedCollapsingMergeTree │ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │ +└───────────────────────────────┴───────────────────┴───────────────────────────┴─────────────────────┴──────────────┴──────────────────────┴────────────────────────┴──────────────────────────┘ ``` **See also** diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp index 80ad8da837c..ca2a3db193f 100644 --- a/programs/client/Client.cpp +++ b/programs/client/Client.cpp @@ -801,7 +801,8 @@ private: connection->setDefaultDatabase(connection_parameters.default_database); ReadBufferFromFile in(queries_file); readStringUntilEOF(text, in); - processMultiQuery(text); + if (!processMultiQuery(text)) + break; } return; } @@ -984,7 +985,8 @@ private: if (query_fuzzer_runs) { - processWithFuzzing(full_query); + if (!processWithFuzzing(full_query)) + return false; } else { @@ -1034,7 +1036,8 @@ private: } - void processWithFuzzing(const String & text) + /// Returns false when server is not available. + bool processWithFuzzing(const String & text) { ASTPtr orig_ast; @@ -1052,7 +1055,7 @@ private: if (!orig_ast) { // Can't continue after a parsing error - return; + return true; } // Don't repeat inserts, the tables grow too big. Also don't repeat @@ -1147,7 +1150,7 @@ private: // Probably the server is dead because we found an assertion // failure. Fail fast. fmt::print(stderr, "Lost connection to the server\n"); - return; + return false; } // The server is still alive so we're going to continue fuzzing. @@ -1173,6 +1176,8 @@ private: fuzz_base = ast_to_process; } } + + return true; } void processTextAsSingleQuery(const String & text_) diff --git a/src/AggregateFunctions/AggregateFunctionWindowFunnel.h b/src/AggregateFunctions/AggregateFunctionWindowFunnel.h index d112324f693..a59e1c8b26a 100644 --- a/src/AggregateFunctions/AggregateFunctionWindowFunnel.h +++ b/src/AggregateFunctions/AggregateFunctionWindowFunnel.h @@ -33,7 +33,7 @@ template struct AggregateFunctionWindowFunnelData { using TimestampEvent = std::pair; - using TimestampEvents = PODArray; + using TimestampEvents = PODArrayWithStackMemory; using Comparator = ComparePair; bool sorted = true; diff --git a/src/Core/BackgroundSchedulePool.cpp b/src/Core/BackgroundSchedulePool.cpp index 6b568ea937f..b3aedd6c875 100644 --- a/src/Core/BackgroundSchedulePool.cpp +++ b/src/Core/BackgroundSchedulePool.cpp @@ -1,5 +1,4 @@ #include "BackgroundSchedulePool.h" -#include #include #include #include diff --git a/src/Core/MySQL/Authentication.cpp b/src/Core/MySQL/Authentication.cpp index b0f5f8ccae2..e685ad0394d 100644 --- a/src/Core/MySQL/Authentication.cpp +++ b/src/Core/MySQL/Authentication.cpp @@ -6,7 +6,6 @@ #include #include -#include #include #include diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index 42ece547e1c..cfb4c4e9646 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -120,9 +120,6 @@ public: /// Returns true if the storage supports deduplication of inserted data blocks. virtual bool supportsDeduplication() const { return false; } - /// Returns true if the storage supports settings. - virtual bool supportsSettings() const { return false; } - /// Returns true if the blocks shouldn't be pushed to associated views on insert. virtual bool noPushingToViews() const { return false; } diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 8ec8e718011..53871990810 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -36,7 +36,6 @@ class StorageKafka final : public ext::shared_ptr_helper, public I public: std::string getName() const override { return "Kafka"; } - bool supportsSettings() const override { return true; } bool noPushingToViews() const override { return true; } void startup() override; diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index e5ffe8c025b..52c0b61b977 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -357,7 +357,6 @@ public: || merging_params.mode == MergingParams::VersionedCollapsing; } - bool supportsSettings() const override { return true; } NamesAndTypesList getVirtuals() const override; bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context &, const StorageMetadataPtr & metadata_snapshot) const override; diff --git a/src/Storages/MergeTree/ReplicatedFetchList.h b/src/Storages/MergeTree/ReplicatedFetchList.h index 81d538abf9c..0ab631e53b4 100644 --- a/src/Storages/MergeTree/ReplicatedFetchList.h +++ b/src/Storages/MergeTree/ReplicatedFetchList.h @@ -3,9 +3,9 @@ #include #include #include -#include #include + namespace CurrentMetrics { extern const Metric ReplicatedFetch; diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index a2429cead3d..0854cc3653c 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -756,6 +756,7 @@ void registerStorageMergeTree(StorageFactory & factory) .supports_skipping_indices = true, .supports_sort_order = true, .supports_ttl = true, + .supports_parallel_insert = true, }; factory.registerStorage("MergeTree", create, features); diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.h b/src/Storages/RabbitMQ/StorageRabbitMQ.h index a46da6072af..893c5167a97 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.h +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.h @@ -29,7 +29,6 @@ class StorageRabbitMQ final: public ext::shared_ptr_helper, pub public: std::string getName() const override { return "RabbitMQ"; } - bool supportsSettings() const override { return true; } bool noPushingToViews() const override { return true; } void startup() override; diff --git a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp index 80b25793806..249026d1011 100644 --- a/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp +++ b/src/Storages/RocksDB/StorageEmbeddedRocksDB.cpp @@ -367,6 +367,7 @@ void registerStorageEmbeddedRocksDB(StorageFactory & factory) { StorageFactory::StorageFeatures features{ .supports_sort_order = true, + .supports_parallel_insert = true, }; factory.registerStorage("EmbeddedRocksDB", create, features); diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index e24db51688e..34787556649 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -996,6 +996,9 @@ void registerStorageBuffer(StorageFactory & factory) StorageBuffer::Thresholds{max_time, max_rows, max_bytes}, destination_id, static_cast(args.local_context.getSettingsRef().insert_allow_materialized_columns)); + }, + { + .supports_parallel_insert = true, }); } diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 1390455edb1..dd99d0f0f27 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -1005,6 +1005,7 @@ void registerStorageDistributed(StorageFactory & factory) args.attach); }, { + .supports_parallel_insert = true, .source_access_type = AccessType::REMOTE, }); } diff --git a/src/Storages/StorageFactory.h b/src/Storages/StorageFactory.h index de9060769cb..18dd24e10db 100644 --- a/src/Storages/StorageFactory.h +++ b/src/Storages/StorageFactory.h @@ -47,14 +47,20 @@ public: bool has_force_restore_data_flag; }; + /// Analog of the IStorage::supports*() helpers + /// (But the former cannot be replaced with StorageFeatures due to nesting) struct StorageFeatures { bool supports_settings = false; bool supports_skipping_indices = false; bool supports_sort_order = false; bool supports_ttl = false; + /// See also IStorage::supportsReplication() bool supports_replication = false; + /// See also IStorage::supportsDeduplication() bool supports_deduplication = false; + /// See also IStorage::supportsParallelInsert() + bool supports_parallel_insert = false; AccessType source_access_type = AccessType::NONE; }; @@ -85,6 +91,7 @@ public: .supports_ttl = false, .supports_replication = false, .supports_deduplication = false, + .supports_parallel_insert = false, .source_access_type = AccessType::NONE, }); diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index 93f00206e6b..8651caecdfa 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -303,6 +303,9 @@ void registerStorageMemory(StorageFactory & factory) ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); return StorageMemory::create(args.table_id, args.columns, args.constraints); + }, + { + .supports_parallel_insert = true, }); } diff --git a/src/Storages/StorageNull.cpp b/src/Storages/StorageNull.cpp index 499f7329cd9..f324d502834 100644 --- a/src/Storages/StorageNull.cpp +++ b/src/Storages/StorageNull.cpp @@ -29,6 +29,9 @@ void registerStorageNull(StorageFactory & factory) ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); return StorageNull::create(args.table_id, args.columns, args.constraints); + }, + { + .supports_parallel_insert = true, }); } diff --git a/src/Storages/StorageProxy.h b/src/Storages/StorageProxy.h index b7b948af4ba..fed9dd04e76 100644 --- a/src/Storages/StorageProxy.h +++ b/src/Storages/StorageProxy.h @@ -25,7 +25,6 @@ public: bool supportsReplication() const override { return getNested()->supportsReplication(); } bool supportsParallelInsert() const override { return getNested()->supportsParallelInsert(); } bool supportsDeduplication() const override { return getNested()->supportsDeduplication(); } - bool supportsSettings() const override { return getNested()->supportsSettings(); } bool noPushingToViews() const override { return getNested()->noPushingToViews(); } bool hasEvenlyDistributedRead() const override { return getNested()->hasEvenlyDistributedRead(); } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 8a802037f61..ebf1e43ca04 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -4958,8 +4958,13 @@ void StorageReplicatedMergeTree::fetchPartition( const String & from_, const Context & query_context) { - String auxiliary_zookeeper_name = extractZooKeeperName(from_); - String from = extractZooKeeperPath(from_); + Macros::MacroExpansionInfo info; + info.expand_special_macros_only = false; + info.table_id = getStorageID(); + info.table_id.uuid = UUIDHelpers::Nil; + auto expand_from = query_context.getMacros()->expand(from_, info); + String auxiliary_zookeeper_name = extractZooKeeperName(expand_from); + String from = extractZooKeeperPath(expand_from); if (from.empty()) throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); diff --git a/src/Storages/System/StorageSystemTableEngines.cpp b/src/Storages/System/StorageSystemTableEngines.cpp index e63923f69b6..3f06faf6736 100644 --- a/src/Storages/System/StorageSystemTableEngines.cpp +++ b/src/Storages/System/StorageSystemTableEngines.cpp @@ -8,26 +8,31 @@ namespace DB NamesAndTypesList StorageSystemTableEngines::getNamesAndTypes() { - return {{"name", std::make_shared()}, - {"supports_settings", std::make_shared()}, - {"supports_skipping_indices", std::make_shared()}, - {"supports_sort_order", std::make_shared()}, - {"supports_ttl", std::make_shared()}, - {"supports_replication", std::make_shared()}, - {"supports_deduplication", std::make_shared()}}; + return { + {"name", std::make_shared()}, + {"supports_settings", std::make_shared()}, + {"supports_skipping_indices", std::make_shared()}, + {"supports_sort_order", std::make_shared()}, + {"supports_ttl", std::make_shared()}, + {"supports_replication", std::make_shared()}, + {"supports_deduplication", std::make_shared()}, + {"supports_parallel_insert", std::make_shared()}, + }; } void StorageSystemTableEngines::fillData(MutableColumns & res_columns, const Context &, const SelectQueryInfo &) const { for (const auto & pair : StorageFactory::instance().getAllStorages()) { - res_columns[0]->insert(pair.first); - res_columns[1]->insert(pair.second.features.supports_settings); - res_columns[2]->insert(pair.second.features.supports_skipping_indices); - res_columns[3]->insert(pair.second.features.supports_sort_order); - res_columns[4]->insert(pair.second.features.supports_ttl); - res_columns[5]->insert(pair.second.features.supports_replication); - res_columns[6]->insert(pair.second.features.supports_deduplication); + int i = 0; + res_columns[i++]->insert(pair.first); + res_columns[i++]->insert(pair.second.features.supports_settings); + res_columns[i++]->insert(pair.second.features.supports_skipping_indices); + res_columns[i++]->insert(pair.second.features.supports_sort_order); + res_columns[i++]->insert(pair.second.features.supports_ttl); + res_columns[i++]->insert(pair.second.features.supports_replication); + res_columns[i++]->insert(pair.second.features.supports_deduplication); + res_columns[i++]->insert(pair.second.features.supports_parallel_insert); } } diff --git a/src/Storages/tests/gtest_transform_query_for_external_database.cpp b/src/Storages/tests/gtest_transform_query_for_external_database.cpp index 835aebab900..99dfc55ed69 100644 --- a/src/Storages/tests/gtest_transform_query_for_external_database.cpp +++ b/src/Storages/tests/gtest_transform_query_for_external_database.cpp @@ -80,6 +80,24 @@ TEST(TransformQueryForExternalDatabase, InWithSingleElement) state.context, state.columns); } +TEST(TransformQueryForExternalDatabase, InWithTable) +{ + const State & state = State::instance(); + + check("SELECT column FROM test.table WHERE 1 IN external_table", + R"(SELECT "column" FROM "test"."table")", + state.context, state.columns); + check("SELECT column FROM test.table WHERE 1 IN (x)", + R"(SELECT "column" FROM "test"."table")", + state.context, state.columns); + check("SELECT column, field, value FROM test.table WHERE column IN (field, value)", + R"(SELECT "column", "field", "value" FROM "test"."table" WHERE "column" IN ("field", "value"))", + state.context, state.columns); + check("SELECT column FROM test.table WHERE column NOT IN hello AND column = 123", + R"(SELECT "column" FROM "test"."table" WHERE ("column" = 123))", + state.context, state.columns); +} + TEST(TransformQueryForExternalDatabase, Like) { const State & state = State::instance(); diff --git a/src/Storages/transformQueryForExternalDatabase.cpp b/src/Storages/transformQueryForExternalDatabase.cpp index f35fb1c8a34..42daf8cfc26 100644 --- a/src/Storages/transformQueryForExternalDatabase.cpp +++ b/src/Storages/transformQueryForExternalDatabase.cpp @@ -138,6 +138,12 @@ bool isCompatible(const IAST & node) if (name == "tuple" && function->arguments->children.size() <= 1) return false; + /// If the right hand side of IN is an identifier (example: x IN table), then it's not compatible. + if ((name == "in" || name == "notIn") + && (function->arguments->children.size() != 2 + || function->arguments->children[1]->as())) + return false; + for (const auto & expr : function->arguments->children) if (!isCompatible(*expr)) return false; diff --git a/tests/performance/optimize_window_funnel.xml b/tests/performance/optimize_window_funnel.xml new file mode 100644 index 00000000000..0d928fd0f4e --- /dev/null +++ b/tests/performance/optimize_window_funnel.xml @@ -0,0 +1,12 @@ + + CREATE TABLE action(uid UInt64, event String, time DateTime) ENGINE = MergeTree ORDER BY uid + + INSERT INTO action SELECT arrayJoin(groupArray(number)), 'a', now() from numbers(1000000) + INSERT INTO action SELECT arrayJoin(groupArray(number)), 'b', now() + INTERVAL 6 hour from numbers(1000000) + INSERT INTO action SELECT arrayJoin(groupArray(number)), 'c', now() + INTERVAL 12 hour from numbers(1000000) + INSERT INTO action SELECT arrayJoin(groupArray(number)), 'd', now() + INTERVAL 18 hour from numbers(1000000) + + SELECT level, count() from (select windowFunnel(86400)(time, event='a', event='b', event='c', event='d') level from action group by uid) group by level FORMAT Null + + DROP TABLE IF EXISTS action + diff --git a/tests/queries/0_stateless/01645_system_table_engines.reference b/tests/queries/0_stateless/01645_system_table_engines.reference new file mode 100644 index 00000000000..afe0584bea1 --- /dev/null +++ b/tests/queries/0_stateless/01645_system_table_engines.reference @@ -0,0 +1,4 @@ +┌─name──────────────────────────┬─supports_settings─┬─supports_skipping_indices─┬─supports_sort_order─┬─supports_ttl─┬─supports_replication─┬─supports_deduplication─┬─supports_parallel_insert─┐ +│ MergeTree │ 1 │ 1 │ 1 │ 1 │ 0 │ 0 │ 1 │ +│ ReplicatedCollapsingMergeTree │ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │ +└───────────────────────────────┴───────────────────┴───────────────────────────┴─────────────────────┴──────────────┴──────────────────────┴────────────────────────┴──────────────────────────┘ diff --git a/tests/queries/0_stateless/01645_system_table_engines.sql b/tests/queries/0_stateless/01645_system_table_engines.sql new file mode 100644 index 00000000000..5e8eef5508b --- /dev/null +++ b/tests/queries/0_stateless/01645_system_table_engines.sql @@ -0,0 +1 @@ +SELECT * FROM system.table_engines WHERE name in ('MergeTree', 'ReplicatedCollapsingMergeTree') FORMAT PrettyCompactNoEscapes;