From e51bee58ecf8f684abfb0f1f9ac1806fb47b5efd Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 24 Apr 2022 22:32:45 +0200 Subject: [PATCH 001/654] Remove useless "install" from CMake (step 1) --- CMakeLists.txt | 1 - base/glibc-compatibility/CMakeLists.txt | 6 ---- base/harmful/CMakeLists.txt | 1 - tests/CMakeLists.txt | 26 --------------- tests/integration/CMakeLists.txt | 24 -------------- utils/CMakeLists.txt | 5 --- utils/config-processor/CMakeLists.txt | 2 -- utils/config-processor/config-processor.cpp | 35 --------------------- utils/report/CMakeLists.txt | 1 - 9 files changed, 101 deletions(-) delete mode 100644 tests/CMakeLists.txt delete mode 100644 tests/integration/CMakeLists.txt delete mode 100644 utils/config-processor/CMakeLists.txt delete mode 100644 utils/config-processor/config-processor.cpp delete mode 100644 utils/report/CMakeLists.txt diff --git a/CMakeLists.txt b/CMakeLists.txt index e8b6e9217d2..bffdd810686 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -577,7 +577,6 @@ include (cmake/print_flags.cmake) add_subdirectory (base) add_subdirectory (src) add_subdirectory (programs) -add_subdirectory (tests) add_subdirectory (utils) include (cmake/sanitize_target_link_libraries.cmake) diff --git a/base/glibc-compatibility/CMakeLists.txt b/base/glibc-compatibility/CMakeLists.txt index ef7ec6d7fc0..37423bb68a6 100644 --- a/base/glibc-compatibility/CMakeLists.txt +++ b/base/glibc-compatibility/CMakeLists.txt @@ -43,12 +43,6 @@ if (GLIBC_COMPATIBILITY) target_link_libraries(global-libs INTERFACE glibc-compatibility ${MEMCPY_LIBRARY}) - install( - TARGETS glibc-compatibility ${MEMCPY_LIBRARY} - EXPORT global - ARCHIVE DESTINATION lib - ) - message (STATUS "Some symbols from glibc will be replaced for compatibility") elseif (CLICKHOUSE_OFFICIAL_BUILD) diff --git a/base/harmful/CMakeLists.txt b/base/harmful/CMakeLists.txt index 399f6ecc625..c19661875be 100644 --- a/base/harmful/CMakeLists.txt +++ b/base/harmful/CMakeLists.txt @@ -1,2 +1 @@ add_library(harmful harmful.c) -install(TARGETS harmful EXPORT global ARCHIVE DESTINATION lib) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt deleted file mode 100644 index 22c89aaafa7..00000000000 --- a/tests/CMakeLists.txt +++ /dev/null @@ -1,26 +0,0 @@ -enable_testing() - -# Run tests with "ninja check" or "make check" -if (TARGET check) - message (STATUS "Target check already exists") -else () - include (${ClickHouse_SOURCE_DIR}/cmake/add_check.cmake) -endif () - -option (ENABLE_CLICKHOUSE_TEST "Install clickhouse-test script and relevant tests scenarios" OFF) - -if (ENABLE_CLICKHOUSE_TEST) - install (PROGRAMS clickhouse-test DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) - install ( - DIRECTORY queries performance config - DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/clickhouse-test - USE_SOURCE_PERMISSIONS - COMPONENT clickhouse - PATTERN "CMakeLists.txt" EXCLUDE - PATTERN ".gitignore" EXCLUDE - ) -endif () - -if (ENABLE_TEST_INTEGRATION) - add_subdirectory (integration) -endif () diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt deleted file mode 100644 index 68c695f57a0..00000000000 --- a/tests/integration/CMakeLists.txt +++ /dev/null @@ -1,24 +0,0 @@ -if(CLICKHOUSE_SPLIT_BINARY) - set (TEST_USE_BINARIES CLICKHOUSE_TESTS_SERVER_BIN_PATH=${ClickHouse_BINARY_DIR}/programs/clickhouse-server CLICKHOUSE_TESTS_CLIENT_BIN_PATH=${ClickHouse_BINARY_DIR}/programs/clickhouse-client) -else() - set (TEST_USE_BINARIES CLICKHOUSE_TESTS_SERVER_BIN_PATH=${ClickHouse_BINARY_DIR}/programs/clickhouse CLICKHOUSE_TESTS_CLIENT_BIN_PATH=${ClickHouse_BINARY_DIR}/programs/clickhouse) -endif() - -find_program(DOCKER_CMD docker) -find_program(DOCKER_COMPOSE_CMD docker-compose) -find_program(PYTEST_CMD pytest) -find_program(SUDO_CMD sudo) - -# will mount only one binary to docker container - build with .so cant work -if(USE_STATIC_LIBRARIES AND DOCKER_CMD) - if(INTEGRATION_USE_RUNNER AND SUDO_CMD) - add_test(NAME integration-runner WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND ${SUDO_CMD} ${CMAKE_CURRENT_SOURCE_DIR}/runner --binary ${ClickHouse_BINARY_DIR}/programs/clickhouse --configs-dir ${ClickHouse_SOURCE_DIR}/programs/server/) - message(STATUS "Using tests in docker with runner SUDO=${SUDO_CMD}; DOCKER=${DOCKER_CMD};") - endif() - if(NOT INTEGRATION_USE_RUNNER AND DOCKER_COMPOSE_CMD AND PYTEST_CMD) - # To run one test with debug: - # cmake . -DPYTEST_OPT="-ss;test_cluster_copier" - add_test(NAME integration-pytest WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND env ${TEST_USE_BINARIES} "CLICKHOUSE_TESTS_BASE_CONFIG_DIR=${ClickHouse_SOURCE_DIR}/programs/server/" "CLICKHOUSE_TESTS_CONFIG_DIR=${ClickHouse_SOURCE_DIR}/tests/config/" ${PYTEST_STARTER} ${PYTEST_CMD} ${PYTEST_OPT}) - message(STATUS "Using tests in docker DOCKER=${DOCKER_CMD}; DOCKER_COMPOSE=${DOCKER_COMPOSE_CMD}; PYTEST=${PYTEST_STARTER} ${PYTEST_CMD} ${PYTEST_OPT}") - endif() -endif() diff --git a/utils/CMakeLists.txt b/utils/CMakeLists.txt index 51300472ed1..d4f22d8065d 100644 --- a/utils/CMakeLists.txt +++ b/utils/CMakeLists.txt @@ -9,11 +9,6 @@ else() endif() include(../cmake/limit_jobs.cmake) -# Utils used in package -add_subdirectory (config-processor) -add_subdirectory (report) - -# Not used in package if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS) add_subdirectory (compressor) add_subdirectory (iotest) diff --git a/utils/config-processor/CMakeLists.txt b/utils/config-processor/CMakeLists.txt deleted file mode 100644 index 76c10b5f2fd..00000000000 --- a/utils/config-processor/CMakeLists.txt +++ /dev/null @@ -1,2 +0,0 @@ -add_executable (config-processor config-processor.cpp) -target_link_libraries(config-processor PRIVATE clickhouse_common_config_no_zookeeper_log) diff --git a/utils/config-processor/config-processor.cpp b/utils/config-processor/config-processor.cpp deleted file mode 100644 index 242a6782b3b..00000000000 --- a/utils/config-processor/config-processor.cpp +++ /dev/null @@ -1,35 +0,0 @@ -#include -#include - -int main(int argc, char ** argv) -{ - try - { - if (argc != 2) - { - std::cerr << "usage: " << argv[0] << " path" << std::endl; - return 3; - } - - DB::ConfigProcessor processor(argv[1], false, true); - DB::XMLDocumentPtr document = processor.processConfig(); - Poco::XML::DOMWriter().writeNode(std::cout, document); - } - catch (Poco::Exception & e) - { - std::cerr << "Exception: " << e.displayText() << std::endl; - return 1; - } - catch (std::exception & e) - { - std::cerr << "std::exception: " << e.what() << std::endl; - return 3; - } - catch (...) - { - std::cerr << "Some exception" << std::endl; - return 2; - } - - return 0; -} diff --git a/utils/report/CMakeLists.txt b/utils/report/CMakeLists.txt deleted file mode 100644 index e39dd155b15..00000000000 --- a/utils/report/CMakeLists.txt +++ /dev/null @@ -1 +0,0 @@ -install (PROGRAMS clickhouse-report DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse) From f5c9d278ad7be8a90d92d66546134575ad54c7e7 Mon Sep 17 00:00:00 2001 From: Han Fei Date: Wed, 9 Aug 2023 22:57:49 +0200 Subject: [PATCH 002/654] use statistic to order prewhere conditions better --- src/Access/Common/AccessType.h | 5 + src/AggregateFunctions/QuantileTDigest.h | 11 + src/CMakeLists.txt | 1 + src/Databases/DatabasesCommon.cpp | 2 + src/Interpreters/InterpreterAlterQuery.cpp | 15 + src/Interpreters/InterpreterCreateQuery.cpp | 17 ++ src/Interpreters/InterpreterCreateQuery.h | 2 + src/Interpreters/InterpreterSelectQuery.cpp | 1 + src/Interpreters/MutationsInterpreter.cpp | 36 ++- src/Interpreters/MutationsInterpreter.h | 5 +- src/Parsers/ASTAlterQuery.cpp | 38 +++ src/Parsers/ASTAlterQuery.h | 9 + src/Parsers/ASTCreateQuery.cpp | 12 + src/Parsers/ASTCreateQuery.h | 3 +- src/Parsers/ASTStatisticDeclaration.cpp | 35 +++ src/Parsers/ASTStatisticDeclaration.h | 26 ++ src/Parsers/ParserAlterQuery.cpp | 70 +++++ src/Parsers/ParserCreateQuery.cpp | 46 +++ src/Parsers/ParserCreateQuery.h | 12 + .../Optimizations/optimizePrewhere.cpp | 1 + src/Storages/AlterCommands.cpp | 100 +++++++ src/Storages/AlterCommands.h | 6 + src/Storages/ColumnDependency.h | 5 +- src/Storages/IStorage.h | 3 + src/Storages/MergeTree/DataPartsExchange.cpp | 2 +- src/Storages/MergeTree/IMergeTreeDataPart.h | 2 + src/Storages/MergeTree/MergeTask.cpp | 5 +- src/Storages/MergeTree/MergeTreeData.cpp | 4 +- .../MergeTree/MergeTreeDataPartCompact.cpp | 3 +- .../MergeTree/MergeTreeDataPartCompact.h | 1 + .../MergeTree/MergeTreeDataPartInMemory.cpp | 5 +- .../MergeTree/MergeTreeDataPartInMemory.h | 1 + .../MergeTree/MergeTreeDataPartWide.cpp | 3 +- .../MergeTree/MergeTreeDataPartWide.h | 1 + .../MergeTreeDataPartWriterCompact.cpp | 6 +- .../MergeTreeDataPartWriterCompact.h | 1 + .../MergeTreeDataPartWriterOnDisk.cpp | 126 +++++++-- .../MergeTree/MergeTreeDataPartWriterOnDisk.h | 30 +- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 14 +- .../MergeTree/MergeTreeDataPartWriterWide.h | 1 + .../MergeTree/MergeTreeDataWriter.cpp | 2 + .../MergeTree/MergeTreeWhereOptimizer.cpp | 131 ++++----- .../MergeTree/MergeTreeWhereOptimizer.h | 10 +- .../MergeTree/MergeTreeWriteAheadLog.cpp | 2 +- .../MergeTree/MergedBlockOutputStream.cpp | 3 +- .../MergeTree/MergedBlockOutputStream.h | 2 + .../MergedColumnOnlyOutputStream.cpp | 2 + .../MergeTree/MergedColumnOnlyOutputStream.h | 2 + src/Storages/MergeTree/MutateTask.cpp | 64 ++++- .../MergeTree/registerStorageMergeTree.cpp | 5 + src/Storages/MutationCommands.cpp | 11 + src/Storages/MutationCommands.h | 5 +- src/Storages/Statistic/Statistic.cpp | 155 +++++++++++ src/Storages/Statistic/Statistic.h | 262 ++++++++++++++++++ src/Storages/StatisticsDescription.cpp | 120 ++++++++ src/Storages/StatisticsDescription.h | 46 +++ src/Storages/StorageInMemoryMetadata.cpp | 13 + src/Storages/StorageInMemoryMetadata.h | 8 + 58 files changed, 1391 insertions(+), 118 deletions(-) create mode 100644 src/Parsers/ASTStatisticDeclaration.cpp create mode 100644 src/Parsers/ASTStatisticDeclaration.h create mode 100644 src/Storages/Statistic/Statistic.cpp create mode 100644 src/Storages/Statistic/Statistic.h create mode 100644 src/Storages/StatisticsDescription.cpp create mode 100644 src/Storages/StatisticsDescription.h diff --git a/src/Access/Common/AccessType.h b/src/Access/Common/AccessType.h index f65a77c1d6a..dac3f813dd6 100644 --- a/src/Access/Common/AccessType.h +++ b/src/Access/Common/AccessType.h @@ -51,6 +51,11 @@ enum class AccessType M(ALTER_CLEAR_INDEX, "CLEAR INDEX", TABLE, ALTER_INDEX) \ M(ALTER_INDEX, "INDEX", GROUP, ALTER_TABLE) /* allows to execute ALTER ORDER BY or ALTER {ADD|DROP...} INDEX */\ \ + M(ALTER_ADD_STATISTIC, "ALTER ADD STATISTIC", TABLE, ALTER_STATISTIC) \ + M(ALTER_DROP_STATISTIC, "ALTER DROP STATISTIC", TABLE, ALTER_STATISTIC) \ + M(ALTER_MATERIALIZE_STATISTIC, "ALTER MATERIALIZE STATISTIC", TABLE, ALTER_STATISTIC) \ + M(ALTER_STATISTIC, "STATISTIC", GROUP, ALTER_TABLE) /* allows to execute ALTER ORDER BY or ALTER {ADD|DROP...} INDEX */\ + \ M(ALTER_ADD_PROJECTION, "ADD PROJECTION", TABLE, ALTER_PROJECTION) \ M(ALTER_DROP_PROJECTION, "DROP PROJECTION", TABLE, ALTER_PROJECTION) \ M(ALTER_MATERIALIZE_PROJECTION, "MATERIALIZE PROJECTION", TABLE, ALTER_PROJECTION) \ diff --git a/src/AggregateFunctions/QuantileTDigest.h b/src/AggregateFunctions/QuantileTDigest.h index 915f6763e52..8706f77c12d 100644 --- a/src/AggregateFunctions/QuantileTDigest.h +++ b/src/AggregateFunctions/QuantileTDigest.h @@ -43,6 +43,7 @@ namespace ErrorCodes template class QuantileTDigest { + friend class TDigestStatistic; using Value = Float32; using Count = Float32; using BetterFloat = Float64; // For intermediate results and sum(Count). Must have better precision, than Count @@ -334,6 +335,16 @@ public: compress(); // Allows reading/writing TDigests with different epsilon/max_centroids params } + Float64 getCountLessThan(Float64 value) const + { + + ///Count sum = 0; + ///Value prev_mean = centroids.front().mean; + ///Count prev_count = centroids.front().count; + + return value; + } + /** Calculates the quantile q [0, 1] based on the digest. * For an empty digest returns NaN. */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ca428fbff3a..df49992595d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -247,6 +247,7 @@ add_object_library(clickhouse_storages Storages) add_object_library(clickhouse_storages_mysql Storages/MySQL) add_object_library(clickhouse_storages_distributed Storages/Distributed) add_object_library(clickhouse_storages_mergetree Storages/MergeTree) +add_object_library(clickhouse_storages_statistic Storages/Statistic) add_object_library(clickhouse_storages_liveview Storages/LiveView) add_object_library(clickhouse_storages_windowview Storages/WindowView) add_object_library(clickhouse_client Client) diff --git a/src/Databases/DatabasesCommon.cpp b/src/Databases/DatabasesCommon.cpp index bb98e2bd3bb..de27c4fd8e7 100644 --- a/src/Databases/DatabasesCommon.cpp +++ b/src/Databases/DatabasesCommon.cpp @@ -46,11 +46,13 @@ void applyMetadataChangesToCreateQuery(const ASTPtr & query, const StorageInMemo { ASTPtr new_columns = InterpreterCreateQuery::formatColumns(metadata.columns); ASTPtr new_indices = InterpreterCreateQuery::formatIndices(metadata.secondary_indices); + ASTPtr new_statistics = InterpreterCreateQuery::formatStatistics(metadata.statistics); ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(metadata.constraints); ASTPtr new_projections = InterpreterCreateQuery::formatProjections(metadata.projections); ast_create_query.columns_list->replace(ast_create_query.columns_list->columns, new_columns); ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->indices, new_indices); + ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->stats, new_statistics); ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->constraints, new_constraints); ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->projections, new_projections); } diff --git a/src/Interpreters/InterpreterAlterQuery.cpp b/src/Interpreters/InterpreterAlterQuery.cpp index e82415f1aca..7f30ee9337d 100644 --- a/src/Interpreters/InterpreterAlterQuery.cpp +++ b/src/Interpreters/InterpreterAlterQuery.cpp @@ -287,6 +287,21 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS required_access.emplace_back(AccessType::ALTER_SAMPLE_BY, database, table); break; } + case ASTAlterCommand::ADD_STATISTIC: + { + required_access.emplace_back(AccessType::ALTER_ADD_STATISTIC, database, table); + break; + } + case ASTAlterCommand::DROP_STATISTIC: + { + required_access.emplace_back(AccessType::ALTER_DROP_STATISTIC, database, table); + break; + } + case ASTAlterCommand::MATERIALIZE_STATISTIC: + { + required_access.emplace_back(AccessType::ALTER_MATERIALIZE_STATISTIC, database, table); + break; + } case ASTAlterCommand::ADD_INDEX: { required_access.emplace_back(AccessType::ALTER_ADD_INDEX, database, table); diff --git a/src/Interpreters/InterpreterCreateQuery.cpp b/src/Interpreters/InterpreterCreateQuery.cpp index d0bb3dd389f..5c0b58eddc8 100644 --- a/src/Interpreters/InterpreterCreateQuery.cpp +++ b/src/Interpreters/InterpreterCreateQuery.cpp @@ -455,6 +455,16 @@ ASTPtr InterpreterCreateQuery::formatIndices(const IndicesDescription & indices) return res; } +ASTPtr InterpreterCreateQuery::formatStatistics(const StatisticsDescriptions & statistics) +{ + auto res = std::make_shared(); + + for (const auto & statistic : statistics) + res->children.push_back(statistic.definition_ast->clone()); + + return res; +} + ASTPtr InterpreterCreateQuery::formatConstraints(const ConstraintsDescription & constraints) { auto res = std::make_shared(); @@ -706,6 +716,11 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti properties.indices.push_back(index_desc); } + if (create.columns_list->stats) + for (const auto & statistic : create.columns_list->stats->children) + properties.stats.push_back( + StatisticDescription::getStatisticFromAST(statistic->clone(), properties.columns, getContext())); + if (create.columns_list->projections) for (const auto & projection_ast : create.columns_list->projections->children) { @@ -791,11 +806,13 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::getTableProperti ASTPtr new_columns = formatColumns(properties.columns); ASTPtr new_indices = formatIndices(properties.indices); + ASTPtr new_statistics = formatStatistics(properties.stats); ASTPtr new_constraints = formatConstraints(properties.constraints); ASTPtr new_projections = formatProjections(properties.projections); create.columns_list->setOrReplace(create.columns_list->columns, new_columns); create.columns_list->setOrReplace(create.columns_list->indices, new_indices); + create.columns_list->setOrReplace(create.columns_list->stats, new_statistics); create.columns_list->setOrReplace(create.columns_list->constraints, new_constraints); create.columns_list->setOrReplace(create.columns_list->projections, new_projections); diff --git a/src/Interpreters/InterpreterCreateQuery.h b/src/Interpreters/InterpreterCreateQuery.h index a5fa6576091..88eba3e0d79 100644 --- a/src/Interpreters/InterpreterCreateQuery.h +++ b/src/Interpreters/InterpreterCreateQuery.h @@ -38,6 +38,7 @@ public: static ASTPtr formatColumns(const NamesAndTypesList & columns, const NamesAndAliases & alias_columns); static ASTPtr formatColumns(const ColumnsDescription & columns); static ASTPtr formatIndices(const IndicesDescription & indices); + static ASTPtr formatStatistics(const StatisticsDescriptions & statistics); static ASTPtr formatConstraints(const ConstraintsDescription & constraints); static ASTPtr formatProjections(const ProjectionsDescription & projections); @@ -80,6 +81,7 @@ private: { ColumnsDescription columns; IndicesDescription indices; + StatisticsDescriptions stats; ConstraintsDescription constraints; ProjectionsDescription projections; }; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 6ea15312ec4..be8443d56a7 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -687,6 +687,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( MergeTreeWhereOptimizer where_optimizer{ std::move(column_compressed_sizes), metadata_snapshot, + storage->getConditionEstimatorByPredicate(query_info, context), queried_columns, supported_prewhere_columns, log}; diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 25c52ad8925..22105f063fa 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -479,6 +479,7 @@ void MutationsInterpreter::prepare(bool dry_run) /// TODO Should we get columns, indices and projections from the part itself? Table metadata may be different const ColumnsDescription & columns_desc = metadata_snapshot->getColumns(); const IndicesDescription & indices_desc = metadata_snapshot->getSecondaryIndices(); + const StatisticsDescriptions & statistics_desc = metadata_snapshot->getStatistics(); const ProjectionsDescription & projections_desc = metadata_snapshot->getProjections(); auto storage_snapshot = std::make_shared(*source.getStorage(), metadata_snapshot); @@ -682,7 +683,7 @@ void MutationsInterpreter::prepare(bool dry_run) } else if (command.type == MutationCommand::MATERIALIZE_INDEX) { - mutation_kind.set(MutationKind::MUTATE_INDEX_PROJECTION); + mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION); auto it = std::find_if( std::cbegin(indices_desc), std::end(indices_desc), [&](const IndexDescription & index) @@ -703,9 +704,25 @@ void MutationsInterpreter::prepare(bool dry_run) materialized_indices.emplace(command.index_name); } } + else if (command.type == MutationCommand::MATERIALIZE_STATISTIC) + { + mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION); + auto it = std::find_if( + std::cbegin(statistics_desc), std::end(statistics_desc), + [&](const StatisticDescription & statistic) + { + return statistic.name == command.statistic_name; + }); + if (it == std::cend(statistics_desc)) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown statistic: {}", command.statistic_name); + + for (const auto & column : it->column_names) + dependencies.emplace(column, ColumnDependency::STATISTIC); + materialized_statistics.emplace(command.statistic_name); + } else if (command.type == MutationCommand::MATERIALIZE_PROJECTION) { - mutation_kind.set(MutationKind::MUTATE_INDEX_PROJECTION); + mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION); const auto & projection = projections_desc.get(command.projection_name); if (!source.hasIndexOrProjection(projection.getDirectoryName())) { @@ -716,12 +733,17 @@ void MutationsInterpreter::prepare(bool dry_run) } else if (command.type == MutationCommand::DROP_INDEX) { - mutation_kind.set(MutationKind::MUTATE_INDEX_PROJECTION); + mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION); materialized_indices.erase(command.index_name); } + else if (command.type == MutationCommand::DROP_STATISTIC) + { + mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION); + materialized_statistics.erase(command.statistic_name); + } else if (command.type == MutationCommand::DROP_PROJECTION) { - mutation_kind.set(MutationKind::MUTATE_INDEX_PROJECTION); + mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION); materialized_projections.erase(command.projection_name); } else if (command.type == MutationCommand::MATERIALIZE_TTL) @@ -770,7 +792,9 @@ void MutationsInterpreter::prepare(bool dry_run) auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true, has_index_or_projection); for (const auto & dependency : new_dependencies) { - if (dependency.kind == ColumnDependency::SKIP_INDEX || dependency.kind == ColumnDependency::PROJECTION) + if (dependency.kind == ColumnDependency::SKIP_INDEX + || dependency.kind == ColumnDependency::PROJECTION + || dependency.kind == ColumnDependency::STATISTIC) dependencies.insert(dependency); } } @@ -1288,7 +1312,7 @@ QueryPipelineBuilder MutationsInterpreter::execute() Block MutationsInterpreter::getUpdatedHeader() const { // If it's an index/projection materialization, we don't write any data columns, thus empty header is used - return mutation_kind.mutation_kind == MutationKind::MUTATE_INDEX_PROJECTION ? Block{} : *updated_header; + return mutation_kind.mutation_kind == MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION ? Block{} : *updated_header; } const ColumnDependencies & MutationsInterpreter::getColumnDependencies() const diff --git a/src/Interpreters/MutationsInterpreter.h b/src/Interpreters/MutationsInterpreter.h index d783b503531..4d95f56ee71 100644 --- a/src/Interpreters/MutationsInterpreter.h +++ b/src/Interpreters/MutationsInterpreter.h @@ -91,6 +91,8 @@ public: NameSet grabMaterializedIndices() { return std::move(materialized_indices); } + NameSet grabMaterializedStatistics() { return std::move(materialized_statistics); } + NameSet grabMaterializedProjections() { return std::move(materialized_projections); } struct MutationKind @@ -98,7 +100,7 @@ public: enum MutationKindEnum { MUTATE_UNKNOWN, - MUTATE_INDEX_PROJECTION, + MUTATE_INDEX_STATISTIC_PROJECTION, MUTATE_OTHER, } mutation_kind = MUTATE_UNKNOWN; @@ -212,6 +214,7 @@ private: NameSet materialized_indices; NameSet materialized_projections; + NameSet materialized_statistics; MutationKind mutation_kind; /// Do we meet any index or projection mutation. diff --git a/src/Parsers/ASTAlterQuery.cpp b/src/Parsers/ASTAlterQuery.cpp index 61e5903fad5..b04c0efa85c 100644 --- a/src/Parsers/ASTAlterQuery.cpp +++ b/src/Parsers/ASTAlterQuery.cpp @@ -123,6 +123,9 @@ const char * ASTAlterCommand::typeToString(ASTAlterCommand::Type type) case LIVE_VIEW_REFRESH: return "LIVE_VIEW_REFRESH"; case MODIFY_DATABASE_SETTING: return "MODIFY_DATABASE_SETTING"; case MODIFY_COMMENT: return "MODIFY_COMMENT"; + case ADD_STATISTIC: return "ADD_STATISTIC"; + case DROP_STATISTIC: return "DROP_STATISTIC"; + case MATERIALIZE_STATISTIC: return "MATERIALIZE_STATISTIC"; } UNREACHABLE(); } @@ -248,6 +251,41 @@ void ASTAlterCommand::formatImpl(const FormatSettings & settings, FormatState & partition->formatImpl(settings, state, frame); } } + else if (type == ASTAlterCommand::ADD_STATISTIC) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << "ADD STATISTIC " << (if_not_exists ? "IF NOT EXISTS " : "") + << (settings.hilite ? hilite_none : ""); + statistic_decl->formatImpl(settings, state, frame); + + if (first) + settings.ostr << (settings.hilite ? hilite_keyword : "") << " FIRST " << (settings.hilite ? hilite_none : ""); + else if (statistic) /// AFTER + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << " AFTER " << (settings.hilite ? hilite_none : ""); + statistic->formatImpl(settings, state, frame); + } + } + else if (type == ASTAlterCommand::DROP_STATISTIC) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << (clear_statistic ? "CLEAR " : "DROP ") << "INDEX " + << (if_exists ? "IF EXISTS " : "") << (settings.hilite ? hilite_none : ""); + statistic->formatImpl(settings, state, frame); + if (partition) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN PARTITION " << (settings.hilite ? hilite_none : ""); + partition->formatImpl(settings, state, frame); + } + } + else if (type == ASTAlterCommand::MATERIALIZE_STATISTIC) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << "MATERIALIZE STATISTIC " << (settings.hilite ? hilite_none : ""); + statistic->formatImpl(settings, state, frame); + if (partition) + { + settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN PARTITION " << (settings.hilite ? hilite_none : ""); + partition->formatImpl(settings, state, frame); + } + } else if (type == ASTAlterCommand::ADD_CONSTRAINT) { settings.ostr << (settings.hilite ? hilite_keyword : "") << "ADD CONSTRAINT " << (if_not_exists ? "IF NOT EXISTS " : "") diff --git a/src/Parsers/ASTAlterQuery.h b/src/Parsers/ASTAlterQuery.h index 1400113fa9c..251e8c233e2 100644 --- a/src/Parsers/ASTAlterQuery.h +++ b/src/Parsers/ASTAlterQuery.h @@ -54,6 +54,10 @@ public: DROP_PROJECTION, MATERIALIZE_PROJECTION, + ADD_STATISTIC, + DROP_STATISTIC, + MATERIALIZE_STATISTIC, + DROP_PARTITION, DROP_DETACHED_PARTITION, ATTACH_PARTITION, @@ -129,6 +133,9 @@ public: */ ASTPtr projection; + ASTPtr statistic_decl; + ASTPtr statistic; + /** Used in DROP PARTITION, ATTACH PARTITION FROM, UPDATE, DELETE queries. * The value or ID of the partition is stored here. */ @@ -167,6 +174,8 @@ public: bool clear_index = false; /// for CLEAR INDEX (do not drop index from metadata) + bool clear_statistic = false; /// for CLEAR STATISTIC (do not drop statistic from metadata) + bool clear_projection = false; /// for CLEAR PROJECTION (do not drop projection from metadata) bool if_not_exists = false; /// option for ADD_COLUMN diff --git a/src/Parsers/ASTCreateQuery.cpp b/src/Parsers/ASTCreateQuery.cpp index 196681a8801..5c34841e6a7 100644 --- a/src/Parsers/ASTCreateQuery.cpp +++ b/src/Parsers/ASTCreateQuery.cpp @@ -132,6 +132,8 @@ ASTPtr ASTColumns::clone() const res->set(res->columns, columns->clone()); if (indices) res->set(res->indices, indices->clone()); + if (stats) + res->set(res->stats, stats->clone()); if (constraints) res->set(res->constraints, constraints->clone()); if (projections) @@ -166,6 +168,16 @@ void ASTColumns::formatImpl(const FormatSettings & s, FormatState & state, Forma list.children.push_back(elem); } } + if (stats) + { + for (const auto & stat : stats->children) + { + auto elem = std::make_shared(); + elem->prefix = "STATISTIC"; + elem->set(elem->elem, stat->clone()); + list.children.push_back(elem); + } + } if (constraints) { for (const auto & constraint : constraints->children) diff --git a/src/Parsers/ASTCreateQuery.h b/src/Parsers/ASTCreateQuery.h index 230996f610e..0c1a139c2eb 100644 --- a/src/Parsers/ASTCreateQuery.h +++ b/src/Parsers/ASTCreateQuery.h @@ -53,6 +53,7 @@ class ASTColumns : public IAST public: ASTExpressionList * columns = nullptr; ASTExpressionList * indices = nullptr; + ASTExpressionList * stats = nullptr; ASTExpressionList * constraints = nullptr; ASTExpressionList * projections = nullptr; IAST * primary_key = nullptr; @@ -66,7 +67,7 @@ public: bool empty() const { return (!columns || columns->children.empty()) && (!indices || indices->children.empty()) && (!constraints || constraints->children.empty()) - && (!projections || projections->children.empty()); + && (!projections || projections->children.empty()) && (!stats || stats->children.empty()); } void forEachPointerToChild(std::function f) override diff --git a/src/Parsers/ASTStatisticDeclaration.cpp b/src/Parsers/ASTStatisticDeclaration.cpp new file mode 100644 index 00000000000..53b20b167b7 --- /dev/null +++ b/src/Parsers/ASTStatisticDeclaration.cpp @@ -0,0 +1,35 @@ +#include + +#include +#include +#include + + +namespace DB +{ + +ASTPtr ASTStatisticDeclaration::clone() const +{ + auto res = std::make_shared(); + + res->name = name; + + if (columns) + res->set(res->columns, columns->clone()); + if (type) + res->set(res->type, type->clone()); + return std::move(res); +} + + +void ASTStatisticDeclaration::formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const +{ + s.ostr << backQuoteIfNeed(name); + s.ostr << " "; + columns->formatImpl(s, state, frame); + s.ostr << (s.hilite ? hilite_keyword : "") << " TYPE " << (s.hilite ? hilite_none : ""); + type->formatImpl(s, state, frame); +} + +} + diff --git a/src/Parsers/ASTStatisticDeclaration.h b/src/Parsers/ASTStatisticDeclaration.h new file mode 100644 index 00000000000..0d5ab7723e9 --- /dev/null +++ b/src/Parsers/ASTStatisticDeclaration.h @@ -0,0 +1,26 @@ +#pragma once + +#include + +namespace DB +{ + +class ASTFunction; + +/** name BY columns TYPE typename(args) in create query + */ +class ASTStatisticDeclaration : public IAST +{ +public: + String name; + IAST * columns; + ASTFunction * type; + + /** Get the text that identifies this element. */ + String getID(char) const override { return "Stat"; } + + ASTPtr clone() const override; + void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override; +}; + +} diff --git a/src/Parsers/ParserAlterQuery.cpp b/src/Parsers/ParserAlterQuery.cpp index 8292b52f092..bb94d98d587 100644 --- a/src/Parsers/ParserAlterQuery.cpp +++ b/src/Parsers/ParserAlterQuery.cpp @@ -44,6 +44,11 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserKeyword s_clear_index("CLEAR INDEX"); ParserKeyword s_materialize_index("MATERIALIZE INDEX"); + ParserKeyword s_add_statistic("ADD STATISTIC"); + ParserKeyword s_drop_statistic("DROP STATISTIC"); + ParserKeyword s_clear_statistic("CLEAR STATISTIC"); + ParserKeyword s_materialize_statistic("MATERIALIZE STATISTIC"); + ParserKeyword s_add_constraint("ADD CONSTRAINT"); ParserKeyword s_drop_constraint("DROP CONSTRAINT"); @@ -112,6 +117,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected ParserIdentifier parser_remove_property; ParserCompoundColumnDeclaration parser_col_decl; ParserIndexDeclaration parser_idx_decl; + ParserStatisticDeclaration parser_stat_decl; ParserConstraintDeclaration parser_constraint_decl; ParserProjectionDeclaration parser_projection_decl; ParserCompoundColumnDeclaration parser_modify_col_decl(false, false, true); @@ -327,6 +333,70 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected return false; } } + else if (s_add_statistic.ignore(pos, expected)) + { + if (s_if_not_exists.ignore(pos, expected)) + command->if_not_exists = true; + + if (!parser_stat_decl.parse(pos, command->statistic_decl, expected)) + return false; + + command->type = ASTAlterCommand::ADD_STATISTIC; + + if (s_first.ignore(pos, expected)) + command->first = true; + else if (s_after.ignore(pos, expected)) + { + if (!parser_name.parse(pos, command->statistic, expected)) + return false; + } + } + else if (s_drop_statistic.ignore(pos, expected)) + { + if (s_if_exists.ignore(pos, expected)) + command->if_exists = true; + + if (!parser_name.parse(pos, command->statistic, expected)) + return false; + + command->type = ASTAlterCommand::DROP_STATISTIC; + command->detach = false; + } + else if (s_clear_statistic.ignore(pos, expected)) + { + if (s_if_exists.ignore(pos, expected)) + command->if_exists = true; + + if (!parser_name.parse(pos, command->statistic, expected)) + return false; + + command->type = ASTAlterCommand::DROP_STATISTIC; + command->clear_statistic = true; + command->detach = false; + + if (s_in_partition.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command->partition, expected)) + return false; + } + } + else if (s_materialize_statistic.ignore(pos, expected)) + { + if (s_if_exists.ignore(pos, expected)) + command->if_exists = true; + + if (!parser_name.parse(pos, command->statistic, expected)) + return false; + + command->type = ASTAlterCommand::MATERIALIZE_STATISTIC; + command->detach = false; + + if (s_in_partition.ignore(pos, expected)) + { + if (!parser_partition.parse(pos, command->partition, expected)) + return false; + } + } else if (s_add_projection.ignore(pos, expected)) { if (s_if_not_exists.ignore(pos, expected)) diff --git a/src/Parsers/ParserCreateQuery.cpp b/src/Parsers/ParserCreateQuery.cpp index adf3513ba40..eb79e250a1d 100644 --- a/src/Parsers/ParserCreateQuery.cpp +++ b/src/Parsers/ParserCreateQuery.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -157,6 +158,39 @@ bool ParserIndexDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected & expe return true; } +bool ParserStatisticDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) +{ + ParserKeyword s_type("TYPE"); + + ParserIdentifier name_p; + ParserDataType data_type_p; + ParserExpression expression_p; + + ASTPtr name; + ASTPtr columns; + ASTPtr type; + + if (!name_p.parse(pos, name, expected)) + return false; + + if (!expression_p.parse(pos, columns, expected)) + return false; + + if (!s_type.ignore(pos, expected)) + return false; + + if (!data_type_p.parse(pos, type, expected)) + return false; + + auto stat = std::make_shared(); + stat->name = name->as().name(); + stat->set(stat->columns, columns); + stat->set(stat->type, type); + node = stat; + + return true; +} + bool ParserConstraintDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { ParserKeyword s_check("CHECK"); @@ -226,11 +260,13 @@ bool ParserProjectionDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected & bool ParserTablePropertyDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected & expected) { ParserKeyword s_index("INDEX"); + ParserKeyword s_stat("STATISTIC"); ParserKeyword s_constraint("CONSTRAINT"); ParserKeyword s_projection("PROJECTION"); ParserKeyword s_primary_key("PRIMARY KEY"); ParserIndexDeclaration index_p; + ParserStatisticDeclaration stat_p; ParserConstraintDeclaration constraint_p; ParserProjectionDeclaration projection_p; ParserColumnDeclaration column_p{true, true}; @@ -248,6 +284,11 @@ bool ParserTablePropertyDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expecte if (!constraint_p.parse(pos, new_node, expected)) return false; } + else if (s_stat.ignore(pos, expected)) + { + if (!stat_p.parse(pos, new_node, expected)) + return false; + } else if (s_projection.ignore(pos, expected)) { if (!projection_p.parse(pos, new_node, expected)) @@ -297,6 +338,7 @@ bool ParserTablePropertiesDeclarationList::parseImpl(Pos & pos, ASTPtr & node, E ASTPtr columns = std::make_shared(); ASTPtr indices = std::make_shared(); + ASTPtr stats = std::make_shared(); ASTPtr constraints = std::make_shared(); ASTPtr projections = std::make_shared(); ASTPtr primary_key; @@ -307,6 +349,8 @@ bool ParserTablePropertiesDeclarationList::parseImpl(Pos & pos, ASTPtr & node, E columns->children.push_back(elem); else if (elem->as()) indices->children.push_back(elem); + else if (elem->as()) + stats->children.push_back(elem); else if (elem->as()) constraints->children.push_back(elem); else if (elem->as()) @@ -330,6 +374,8 @@ bool ParserTablePropertiesDeclarationList::parseImpl(Pos & pos, ASTPtr & node, E res->set(res->columns, columns); if (!indices->children.empty()) res->set(res->indices, indices); + if (!stats->children.empty()) + res->set(res->stats, stats); if (!constraints->children.empty()) res->set(res->constraints, constraints); if (!projections->children.empty()) diff --git a/src/Parsers/ParserCreateQuery.h b/src/Parsers/ParserCreateQuery.h index 5f79a4b68f6..4e85e3456f3 100644 --- a/src/Parsers/ParserCreateQuery.h +++ b/src/Parsers/ParserCreateQuery.h @@ -380,6 +380,18 @@ protected: bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; }; +/** name BY columns TYPE typename(arg1, arg2, ...) */ +/** name BY columns */ +class ParserStatisticDeclaration : public IParserBase +{ +public: + ParserStatisticDeclaration() = default; + +protected: + const char * getName() const override { return "statistics declaration"; } + bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override; +}; + class ParserConstraintDeclaration : public IParserBase { protected: diff --git a/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp b/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp index ca8a412bf2e..7c542733927 100644 --- a/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp +++ b/src/Processors/QueryPlan/Optimizations/optimizePrewhere.cpp @@ -158,6 +158,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes) MergeTreeWhereOptimizer where_optimizer{ std::move(column_compressed_sizes), storage_metadata, + storage.getConditionEstimatorByPredicate(read_from_merge_tree->getQueryInfo(), context), queried_columns, storage.supportedPrewhereColumns(), &Poco::Logger::get("QueryPlanOptimizePrewhere")}; diff --git a/src/Storages/AlterCommands.cpp b/src/Storages/AlterCommands.cpp index a9247f9b898..98a5ae3bb2f 100644 --- a/src/Storages/AlterCommands.cpp +++ b/src/Storages/AlterCommands.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -232,6 +233,25 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ return command; } + else if (command_ast->type == ASTAlterCommand::ADD_STATISTIC) + { + AlterCommand command; + command.ast = command_ast->clone(); + command.statistic_decl = command_ast->statistic_decl; + command.type = AlterCommand::ADD_STATISTIC; + + const auto & ast_stat_decl = command_ast->statistic_decl->as(); + + command.statistic_name = ast_stat_decl.name; + + if (command_ast->statistic) + command.after_statistic_name = command_ast->statistic->as().name(); + + command.if_not_exists = command_ast->if_not_exists; + command.first = command_ast->first; + + return command; + } else if (command_ast->type == ASTAlterCommand::ADD_CONSTRAINT) { AlterCommand command; @@ -291,6 +311,20 @@ std::optional AlterCommand::parse(const ASTAlterCommand * command_ return command; } + else if (command_ast->type == ASTAlterCommand::DROP_STATISTIC) + { + AlterCommand command; + command.ast = command_ast->clone(); + command.type = AlterCommand::DROP_STATISTIC; + command.statistic_name = command_ast->statistic->as().name(); + command.if_exists = command_ast->if_exists; + command.clear = command_ast->clear_statistic; + + if (command_ast->partition) + command.partition = command_ast->partition; + + return command; + } else if (command_ast->type == ASTAlterCommand::DROP_PROJECTION) { AlterCommand command; @@ -553,6 +587,68 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context) metadata.secondary_indices.erase(erase_it); } } + else if (type == ADD_STATISTIC) + { + if (std::any_of( + metadata.statistics.cbegin(), + metadata.statistics.cend(), + [this](const auto & statistic) + { + return statistic.name == statistic_name; + })) + { + if (if_not_exists) + return; + else + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot add statistic {} : statistic with this name already exists", statistic_name); + } + + auto insert_it = metadata.statistics.end(); + + /// insert the index in the beginning of the indices list + if (first) + insert_it = metadata.statistics.begin(); + + if (!after_statistic_name.empty()) + { + insert_it = std::find_if( + metadata.statistics.begin(), + metadata.statistics.end(), + [this](const auto & statistic) + { + return statistic.name == after_statistic_name; + }); + + if (insert_it == metadata.statistics.end()) + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong statistic name. Cannot find statistic {} to insert after", backQuote(after_statistic_name)); + + ++insert_it; + } + + metadata.statistics.emplace(insert_it, StatisticDescription::getStatisticFromAST(statistic_decl, metadata.columns, context)); + } + else if (type == DROP_STATISTIC) + { + if (!partition && !clear) + { + auto erase_it = std::find_if( + metadata.statistics.begin(), + metadata.statistics.end(), + [this](const auto & statistic) + { + return statistic.name == statistic_name; + }); + + if (erase_it == metadata.statistics.end()) + { + if (if_exists) + return; + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong statistic name. Cannot find statistic {} to drop", backQuote(statistic_name)); + } + + metadata.statistics.erase(erase_it); + } + } else if (type == ADD_CONSTRAINT) { auto constraints = metadata.constraints.getConstraints(); @@ -877,6 +973,10 @@ std::optional AlterCommand::tryConvertToMutationCommand(Storage result.partition = partition; result.predicate = nullptr; + } + else if (type == DROP_STATISTIC) + { + } else if (type == DROP_PROJECTION) { diff --git a/src/Storages/AlterCommands.h b/src/Storages/AlterCommands.h index 3e526dcc0bb..eae538815f0 100644 --- a/src/Storages/AlterCommands.h +++ b/src/Storages/AlterCommands.h @@ -38,6 +38,8 @@ struct AlterCommand DROP_CONSTRAINT, ADD_PROJECTION, DROP_PROJECTION, + ADD_STATISTIC, + DROP_STATISTIC, MODIFY_TTL, MODIFY_SETTING, RESET_SETTING, @@ -118,6 +120,10 @@ struct AlterCommand /// For ADD/DROP PROJECTION String projection_name; + ASTPtr statistic_decl = nullptr; + String after_statistic_name; + String statistic_name; + /// For MODIFY TTL ASTPtr ttl = nullptr; diff --git a/src/Storages/ColumnDependency.h b/src/Storages/ColumnDependency.h index 6c3c96ec62a..b9088dd0227 100644 --- a/src/Storages/ColumnDependency.h +++ b/src/Storages/ColumnDependency.h @@ -24,7 +24,10 @@ struct ColumnDependency TTL_EXPRESSION, /// TTL is set for @column_name. - TTL_TARGET + TTL_TARGET, + + /// Exists any statistic, that requires @column_name + STATISTIC, }; ColumnDependency(const String & column_name_, Kind kind_) diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index b262d88db57..9231b0c3286 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -135,6 +136,8 @@ public: /// Returns true if the storage supports queries with the PREWHERE section. virtual bool supportsPrewhere() const { return false; } + virtual ConditionEstimator getConditionEstimatorByPredicate(const SelectQueryInfo &, ContextPtr) const { return {}; } + /// Returns which columns supports PREWHERE, or empty std::nullopt if all columns is supported. /// This is needed for engines whose aggregates data from multiple tables, like Merge. virtual std::optional supportedPrewhereColumns() const { return std::nullopt; } diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 23bbc1c7f9d..48fb3303445 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -736,7 +736,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory( } MergedBlockOutputStream part_out( - new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, + new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, {}, CompressionCodecFactory::instance().get("NONE", {}), NO_TRANSACTION_PTR); part_out.write(block); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index fd73d802579..2b1cb5fc2de 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -102,6 +103,7 @@ public: const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot, const std::vector & indices_to_recalc, + const Statistics & stats_to_recalc_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, const MergeTreeIndexGranularity & computed_index_granularity) = 0; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 8f39c31eae0..c5aa74b3a92 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -1,4 +1,5 @@ -#include "Storages/MergeTree/IDataPartStorage.h" +#include +#include #include #include @@ -365,6 +366,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() global_ctx->metadata_snapshot, global_ctx->merging_columns, MergeTreeIndexFactory::instance().getMany(global_ctx->metadata_snapshot->getSecondaryIndices()), + MergeTreeStatisticFactory::instance().getMany(global_ctx->metadata_snapshot->getStatistics()), ctx->compression_codec, global_ctx->txn, /*reset_columns=*/ true, @@ -580,6 +582,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const /// because all of them were already recalculated and written /// as key part of vertical merge std::vector{}, + std::vector{}, /// TODO: think about it &global_ctx->written_offset_columns, global_ctx->to->getIndexGranularity()); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index e9c3a7f66ae..31a48d92083 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -8458,7 +8458,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createEmptyPart( const auto & index_factory = MergeTreeIndexFactory::instance(); MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, - index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec, txn); + index_factory.getMany(metadata_snapshot->getSecondaryIndices()), + Statistics{}, + compression_codec, txn); bool sync_on_insert = settings->fsync_after_insert; diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp index 07e20f16a9f..6f5320062b2 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.cpp @@ -52,6 +52,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter( const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot, const std::vector & indices_to_recalc, + const Statistics & stats_to_recalc_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, const MergeTreeIndexGranularity & computed_index_granularity) @@ -66,7 +67,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter( return std::make_unique( shared_from_this(), ordered_columns_list, metadata_snapshot, - indices_to_recalc, getMarksFileExtension(), + indices_to_recalc, stats_to_recalc_, getMarksFileExtension(), default_codec_, writer_settings, computed_index_granularity); } diff --git a/src/Storages/MergeTree/MergeTreeDataPartCompact.h b/src/Storages/MergeTree/MergeTreeDataPartCompact.h index b115692a7cf..341d464a9da 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartCompact.h @@ -43,6 +43,7 @@ public: const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot, const std::vector & indices_to_recalc, + const Statistics & stats_to_recalc_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, const MergeTreeIndexGranularity & computed_index_granularity) override; diff --git a/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp b/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp index 468747a6c36..a23b4395df6 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp @@ -52,6 +52,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter( const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot, const std::vector & /* indices_to_recalc */, + const Statistics & /* stats_to_recalc_ */, const CompressionCodecPtr & /* default_codec */, const MergeTreeWriterSettings & writer_settings, const MergeTreeIndexGranularity & /* computed_index_granularity */) @@ -92,7 +93,8 @@ MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String & auto compression_codec = storage.getContext()->chooseCompressionCodec(0, 0); auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices()); - MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec, NO_TRANSACTION_PTR); + auto stats = MergeTreeStatisticFactory::instance().getMany(metadata_snapshot->getStatistics()); + MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, stats, compression_codec, NO_TRANSACTION_PTR); out.write(block); const auto & projections = metadata_snapshot->getProjections(); @@ -125,6 +127,7 @@ MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String & MergedBlockOutputStream projection_out( new_projection_part, desc.metadata, new_projection_part->getColumns(), projection_indices, + {}, projection_compression_codec, NO_TRANSACTION_PTR); projection_out.write(old_projection_part->block); diff --git a/src/Storages/MergeTree/MergeTreeDataPartInMemory.h b/src/Storages/MergeTree/MergeTreeDataPartInMemory.h index db7244d8e99..e6ef05319c5 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartInMemory.h +++ b/src/Storages/MergeTree/MergeTreeDataPartInMemory.h @@ -32,6 +32,7 @@ public: const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot, const std::vector & indices_to_recalc, + const Statistics & stats_to_recalc_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, const MergeTreeIndexGranularity & computed_index_granularity) override; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp index f44cbdd8628..20f430bed8f 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.cpp @@ -50,13 +50,14 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartWide::getWriter( const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot, const std::vector & indices_to_recalc, + const Statistics & stats_to_recalc_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, const MergeTreeIndexGranularity & computed_index_granularity) { return std::make_unique( shared_from_this(), columns_list, - metadata_snapshot, indices_to_recalc, + metadata_snapshot, indices_to_recalc, stats_to_recalc_, getMarksFileExtension(), default_codec_, writer_settings, computed_index_granularity); } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWide.h b/src/Storages/MergeTree/MergeTreeDataPartWide.h index 5ee497b9b21..6fc195bbfe7 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWide.h @@ -38,6 +38,7 @@ public: const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot, const std::vector & indices_to_recalc, + const Statistics & stats_to_recalc_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & writer_settings, const MergeTreeIndexGranularity & computed_index_granularity) override; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index 5e1da21da5b..5024ff0217d 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -21,12 +21,13 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact( const NamesAndTypesList & columns_list_, const StorageMetadataPtr & metadata_snapshot_, const std::vector & indices_to_recalc_, + const Statistics & stats_to_recalc, const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & settings_, const MergeTreeIndexGranularity & index_granularity_) : MergeTreeDataPartWriterOnDisk(data_part_, columns_list_, metadata_snapshot_, - indices_to_recalc_, marks_file_extension_, + indices_to_recalc_, stats_to_recalc, marks_file_extension_, default_codec_, settings_, index_granularity_) , plain_file(data_part_->getDataPartStorage().writeFile( MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION, @@ -176,6 +177,7 @@ void MergeTreeDataPartWriterCompact::write(const Block & block, const IColumn::P auto granules_to_write = getGranulesToWrite(index_granularity, flushed_block.rows(), getCurrentMark(), /* last_block = */ false); writeDataBlockPrimaryIndexAndSkipIndices(flushed_block, granules_to_write); setCurrentMark(getCurrentMark() + granules_to_write.size()); + calculateAndSerializeStatistics(flushed_block); } } @@ -422,6 +424,7 @@ void MergeTreeDataPartWriterCompact::fillChecksums(IMergeTreeDataPart::Checksums fillPrimaryIndexChecksums(checksums); fillSkipIndicesChecksums(checksums); + fillStatisticsChecksums(checksums); } void MergeTreeDataPartWriterCompact::finish(bool sync) @@ -434,6 +437,7 @@ void MergeTreeDataPartWriterCompact::finish(bool sync) finishPrimaryIndexSerialization(sync); finishSkipIndicesSerialization(sync); + finishStatisticsSerialization(sync); } } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h index 06f8122393f..c5a045c42d0 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.h @@ -15,6 +15,7 @@ public: const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot_, const std::vector & indices_to_recalc, + const Statistics & stats_to_recalc, const String & marks_file_extension, const CompressionCodecPtr & default_codec, const MergeTreeWriterSettings & settings, diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index f57ffa5ee14..c4f037f65c3 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -1,8 +1,6 @@ #include #include #include -#include -#include "IO/WriteBufferFromFileDecorator.h" namespace DB { @@ -11,7 +9,8 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -void MergeTreeDataPartWriterOnDisk::Stream::preFinalize() +template +void MergeTreeDataPartWriterOnDisk::Stream::preFinalize() { /// Here the main goal is to do preFinalize calls for plain_file and marks_file /// Before that all hashing and compression buffers have to be finalized @@ -22,36 +21,45 @@ void MergeTreeDataPartWriterOnDisk::Stream::preFinalize() compressor.finalize(); plain_hashing.finalize(); - if (compress_marks) + if constexpr (!only_plain_file) { - marks_compressed_hashing.finalize(); - marks_compressor.finalize(); + if (compress_marks) + { + marks_compressed_hashing.finalize(); + marks_compressor.finalize(); + } + + marks_hashing.finalize(); } - marks_hashing.finalize(); - plain_file->preFinalize(); - marks_file->preFinalize(); + if constexpr (!only_plain_file) + marks_file->preFinalize(); is_prefinalized = true; } -void MergeTreeDataPartWriterOnDisk::Stream::finalize() +template +void MergeTreeDataPartWriterOnDisk::Stream::finalize() { if (!is_prefinalized) preFinalize(); plain_file->finalize(); - marks_file->finalize(); + if constexpr (!only_plain_file) + marks_file->finalize(); } -void MergeTreeDataPartWriterOnDisk::Stream::sync() const +template +void MergeTreeDataPartWriterOnDisk::Stream::sync() const { plain_file->sync(); - marks_file->sync(); + if constexpr (!only_plain_file) + marks_file->sync(); } -MergeTreeDataPartWriterOnDisk::Stream::Stream( +template<> +MergeTreeDataPartWriterOnDisk::Stream::Stream( const String & escaped_column_name_, const MutableDataPartStoragePtr & data_part_storage, const String & data_path_, @@ -78,7 +86,27 @@ MergeTreeDataPartWriterOnDisk::Stream::Stream( { } -void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPart::Checksums & checksums) +template<> +MergeTreeDataPartWriterOnDisk::Stream::Stream( + const String & escaped_column_name_, + const MutableDataPartStoragePtr & data_part_storage, + const String & data_path_, + const std::string & data_file_extension_, + const CompressionCodecPtr & compression_codec_, + size_t max_compress_block_size_, + const WriteSettings & query_write_settings) : + escaped_column_name(escaped_column_name_), + data_file_extension{data_file_extension_}, + plain_file(data_part_storage->writeFile(data_path_ + data_file_extension, max_compress_block_size_, query_write_settings)), + plain_hashing(*plain_file), + compressor(plain_hashing, compression_codec_, max_compress_block_size_), + compressed_hashing(compressor), + compress_marks(false) +{ +} + +template +void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPart::Checksums & checksums) { String name = escaped_column_name; @@ -88,15 +116,18 @@ void MergeTreeDataPartWriterOnDisk::Stream::addToChecksums(MergeTreeData::DataPa checksums.files[name + data_file_extension].file_size = plain_hashing.count(); checksums.files[name + data_file_extension].file_hash = plain_hashing.getHash(); - if (compress_marks) + if constexpr (!only_plain_file) { - checksums.files[name + marks_file_extension].is_compressed = true; - checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed_hashing.count(); - checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed_hashing.getHash(); - } + if (compress_marks) + { + checksums.files[name + marks_file_extension].is_compressed = true; + checksums.files[name + marks_file_extension].uncompressed_size = marks_compressed_hashing.count(); + checksums.files[name + marks_file_extension].uncompressed_hash = marks_compressed_hashing.getHash(); + } - checksums.files[name + marks_file_extension].file_size = marks_hashing.count(); - checksums.files[name + marks_file_extension].file_hash = marks_hashing.getHash(); + checksums.files[name + marks_file_extension].file_size = marks_hashing.count(); + checksums.files[name + marks_file_extension].file_hash = marks_hashing.getHash(); + } } @@ -105,12 +136,14 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk( const NamesAndTypesList & columns_list_, const StorageMetadataPtr & metadata_snapshot_, const MergeTreeIndices & indices_to_recalc_, + const Statistics & stats_to_recalc_, const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & settings_, const MergeTreeIndexGranularity & index_granularity_) : IMergeTreeDataPartWriter(data_part_, columns_list_, metadata_snapshot_, settings_, index_granularity_) , skip_indices(indices_to_recalc_) + , stats(stats_to_recalc_) , marks_file_extension(marks_file_extension_) , default_codec(default_codec_) , compute_granularity(index_granularity.empty()) @@ -126,6 +159,7 @@ MergeTreeDataPartWriterOnDisk::MergeTreeDataPartWriterOnDisk( if (settings.rewrite_primary_key) initPrimaryIndex(); initSkipIndices(); + initStatistics(); } // Implementation is split into static functions for ability @@ -207,6 +241,20 @@ void MergeTreeDataPartWriterOnDisk::initPrimaryIndex() } } +void MergeTreeDataPartWriterOnDisk::initStatistics() +{ + for (const auto & stat_ptr : stats) + { + String stats_name = stat_ptr->getFileName(); + stats_streams.emplace_back(std::make_unique>( + stats_name, + data_part->getDataPartStoragePtr(), + stats_name, STAT_FILE_SUFFIX, + default_codec, settings.max_compress_block_size, + settings.query_write_settings)); + } +} + void MergeTreeDataPartWriterOnDisk::initSkipIndices() { ParserCodec codec_parser; @@ -217,7 +265,7 @@ void MergeTreeDataPartWriterOnDisk::initSkipIndices() { String stream_name = skip_index->getFileName(); skip_indices_streams.emplace_back( - std::make_unique( + std::make_unique>( stream_name, data_part->getDataPartStoragePtr(), stream_name, skip_index->getSerializedFileExtension(), @@ -279,6 +327,14 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializePrimaryIndex(const Bloc last_block_index_columns[j] = primary_index_block.getByPosition(j).column; } +void MergeTreeDataPartWriterOnDisk::calculateAndSerializeStatistics(const Block & block) +{ + for (const auto & stat_ptr : stats) + { + stat_ptr->update(block); + } +} + void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block & skip_indexes_block, const Granules & granules_to_write) { /// Filling and writing skip indices like in MergeTreeDataPartWriterWide::writeColumn @@ -417,6 +473,27 @@ void MergeTreeDataPartWriterOnDisk::fillSkipIndicesChecksums(MergeTreeData::Data } } +void MergeTreeDataPartWriterOnDisk::finishStatisticsSerialization(bool sync) +{ + for (auto & stream : stats_streams) + { + stream->finalize(); + if (sync) + stream->sync(); + } +} + +void MergeTreeDataPartWriterOnDisk::fillStatisticsChecksums(MergeTreeData::DataPart::Checksums & checksums) +{ + for (size_t i = 0; i < stats.size(); i++) + { + auto & stream = *stats_streams[i]; + stats[i]->serialize(stream.compressed_hashing); + stream.preFinalize(); + stream.addToChecksums(checksums); + } +} + void MergeTreeDataPartWriterOnDisk::finishSkipIndicesSerialization(bool sync) { for (auto & stream : skip_indices_streams) @@ -442,4 +519,7 @@ Names MergeTreeDataPartWriterOnDisk::getSkipIndicesColumns() const return Names(skip_indexes_column_names_set.begin(), skip_indexes_column_names_set.end()); } +template struct MergeTreeDataPartWriterOnDisk::Stream; +template struct MergeTreeDataPartWriterOnDisk::Stream; + } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index b76b74ab717..30d43b9c180 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace DB { @@ -46,6 +47,7 @@ public: /// Helper class, which holds chain of buffers to write data file with marks. /// It is used to write: one column, skip index or all columns (in compact format). + template struct Stream { Stream( @@ -61,6 +63,15 @@ public: size_t marks_compress_block_size_, const WriteSettings & query_write_settings); + Stream( + const String & escaped_column_name_, + const MutableDataPartStoragePtr & data_part_storage, + const String & data_path_, + const std::string & data_file_extension_, + const CompressionCodecPtr & compression_codec_, + size_t max_compress_block_size_, + const WriteSettings & query_write_settings); + String escaped_column_name; std::string data_file_extension; std::string marks_file_extension; @@ -73,9 +84,9 @@ public: /// marks_compressed_hashing -> marks_compressor -> marks_hashing -> marks_file std::unique_ptr marks_file; - HashingWriteBuffer marks_hashing; - CompressedWriteBuffer marks_compressor; - HashingWriteBuffer marks_compressed_hashing; + std::conditional_t marks_hashing; + std::conditional_t marks_compressor; + std::conditional_t marks_compressed_hashing; bool compress_marks; bool is_prefinalized = false; @@ -89,13 +100,15 @@ public: void addToChecksums(IMergeTreeDataPart::Checksums & checksums); }; - using StreamPtr = std::unique_ptr; + using StreamPtr = std::unique_ptr>; + using StatisticStreamPtr = std::unique_ptr>; MergeTreeDataPartWriterOnDisk( const MergeTreeMutableDataPartPtr & data_part_, const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot_, const std::vector & indices_to_recalc, + const Statistics & stats_to_recalc_, const String & marks_file_extension, const CompressionCodecPtr & default_codec, const MergeTreeWriterSettings & settings, @@ -117,6 +130,8 @@ protected: /// require additional state: skip_indices_aggregators and skip_index_accumulated_marks void calculateAndSerializeSkipIndices(const Block & skip_indexes_block, const Granules & granules_to_write); + void calculateAndSerializeStatistics(const Block & stats_block); + /// Finishes primary index serialization: write final primary index row (if required) and compute checksums void fillPrimaryIndexChecksums(MergeTreeData::DataPart::Checksums & checksums); void finishPrimaryIndexSerialization(bool sync); @@ -124,6 +139,9 @@ protected: void fillSkipIndicesChecksums(MergeTreeData::DataPart::Checksums & checksums); void finishSkipIndicesSerialization(bool sync); + void fillStatisticsChecksums(MergeTreeData::DataPart::Checksums & checksums); + void finishStatisticsSerialization(bool sync); + /// Get global number of the current which we are writing (or going to start to write) size_t getCurrentMark() const { return current_mark; } @@ -134,6 +152,9 @@ protected: const MergeTreeIndices skip_indices; + const Statistics stats; + std::vector stats_streams; + const String marks_file_extension; const CompressionCodecPtr default_codec; @@ -166,6 +187,7 @@ protected: private: void initSkipIndices(); void initPrimaryIndex(); + void initStatistics(); virtual void fillIndexGranularity(size_t index_granularity_for_block, size_t rows_in_block) = 0; }; diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index f9fe6f2c8ab..aa97f515074 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -77,12 +77,13 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide( const NamesAndTypesList & columns_list_, const StorageMetadataPtr & metadata_snapshot_, const std::vector & indices_to_recalc_, + const Statistics & stats_to_recalc_, const String & marks_file_extension_, const CompressionCodecPtr & default_codec_, const MergeTreeWriterSettings & settings_, const MergeTreeIndexGranularity & index_granularity_) : MergeTreeDataPartWriterOnDisk(data_part_, columns_list_, metadata_snapshot_, - indices_to_recalc_, marks_file_extension_, + indices_to_recalc_, stats_to_recalc_, marks_file_extension_, default_codec_, settings_, index_granularity_) { const auto & columns = metadata_snapshot->getColumns(); @@ -116,7 +117,7 @@ void MergeTreeDataPartWriterWide::addStreams( auto ast = parseQuery(codec_parser, "(" + Poco::toUpper(settings.marks_compression_codec) + ")", 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); CompressionCodecPtr marks_compression_codec = CompressionCodecFactory::instance().get(ast, nullptr); - column_streams[stream_name] = std::make_unique( + column_streams[stream_name] = std::make_unique>( stream_name, data_part->getDataPartStoragePtr(), stream_name, DATA_FILE_EXTENSION, @@ -256,6 +257,7 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm calculateAndSerializePrimaryIndex(primary_key_block, granules_to_write); calculateAndSerializeSkipIndices(skip_indexes_block, granules_to_write); + calculateAndSerializeStatistics(block); shiftCurrentMark(granules_to_write); } @@ -272,7 +274,7 @@ void MergeTreeDataPartWriterWide::writeSingleMark( void MergeTreeDataPartWriterWide::flushMarkToFile(const StreamNameAndMark & stream_with_mark, size_t rows_in_mark) { - Stream & stream = *column_streams[stream_with_mark.stream_name]; + auto & stream = *column_streams[stream_with_mark.stream_name]; WriteBuffer & marks_out = stream.compress_marks ? stream.marks_compressed_hashing : stream.marks_hashing; writeIntBinary(stream_with_mark.mark.offset_in_compressed_file, marks_out); @@ -296,7 +298,7 @@ StreamsWithMarks MergeTreeDataPartWriterWide::getCurrentMarksForColumn( if (is_offsets && offset_columns.contains(stream_name)) return; - Stream & stream = *column_streams[stream_name]; + auto & stream = *column_streams[stream_name]; /// There could already be enough data to compress into the new block. if (stream.compressed_hashing.offset() >= settings.min_compress_block_size) @@ -632,6 +634,8 @@ void MergeTreeDataPartWriterWide::fillChecksums(IMergeTreeDataPart::Checksums & fillPrimaryIndexChecksums(checksums); fillSkipIndicesChecksums(checksums); + + fillStatisticsChecksums(checksums); } void MergeTreeDataPartWriterWide::finish(bool sync) @@ -644,6 +648,8 @@ void MergeTreeDataPartWriterWide::finish(bool sync) finishPrimaryIndexSerialization(sync); finishSkipIndicesSerialization(sync); + + finishStatisticsSerialization(sync); } void MergeTreeDataPartWriterWide::writeFinalMark( diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index 633b5119474..574225b9614 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -22,6 +22,7 @@ public: const NamesAndTypesList & columns_list, const StorageMetadataPtr & metadata_snapshot, const std::vector & indices_to_recalc, + const Statistics & stats_to_recalc_, const String & marks_file_extension, const CompressionCodecPtr & default_codec, const MergeTreeWriterSettings & settings, diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index 7e306880e9c..da67bff07f3 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -538,6 +538,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPartImpl( metadata_snapshot, columns, indices, + MergeTreeStatisticFactory::instance().getMany(metadata_snapshot->getStatistics()), compression_codec, context->getCurrentTransaction(), false, @@ -670,6 +671,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl( metadata_snapshot, columns, MergeTreeIndices{}, + Statistics{}, /// TODO(hanfei): It should be helpful to write statistics for projection result. compression_codec, NO_TRANSACTION_PTR, false, false, data.getContext()->getWriteSettings()); diff --git a/src/Storages/MergeTree/MergeTreeWhereOptimizer.cpp b/src/Storages/MergeTree/MergeTreeWhereOptimizer.cpp index 25a4579c73e..c4046133542 100644 --- a/src/Storages/MergeTree/MergeTreeWhereOptimizer.cpp +++ b/src/Storages/MergeTree/MergeTreeWhereOptimizer.cpp @@ -20,16 +20,18 @@ namespace DB /// Conditions like "x = N" are considered good if abs(N) > threshold. /// This is used to assume that condition is likely to have good selectivity. -static constexpr auto threshold = 2; +/// static constexpr auto threshold = 2; MergeTreeWhereOptimizer::MergeTreeWhereOptimizer( std::unordered_map column_sizes_, const StorageMetadataPtr & metadata_snapshot, + const ConditionEstimator & estimator_, const Names & queried_columns_, const std::optional & supported_columns_, Poco::Logger * log_) - : table_columns{collections::map( + : estimator(estimator_) + , table_columns{collections::map( metadata_snapshot->getColumns().getAllPhysical(), [](const NameAndTypePair & col) { return col.name; })} , queried_columns{queried_columns_} , supported_columns{supported_columns_} @@ -132,66 +134,66 @@ static void collectColumns(const RPNBuilderTreeNode & node, const NameSet & colu } } -static bool isConditionGood(const RPNBuilderTreeNode & condition, const NameSet & columns_names) -{ - if (!condition.isFunction()) - return false; - - auto function_node = condition.toFunctionNode(); - - /** We are only considering conditions of form `equals(one, another)` or `one = another`, - * especially if either `one` or `another` is ASTIdentifier - */ - if (function_node.getFunctionName() != "equals" || function_node.getArgumentsSize() != 2) - return false; - - auto lhs_argument = function_node.getArgumentAt(0); - auto rhs_argument = function_node.getArgumentAt(1); - - auto lhs_argument_column_name = lhs_argument.getColumnName(); - auto rhs_argument_column_name = rhs_argument.getColumnName(); - - bool lhs_argument_is_column = columns_names.contains(lhs_argument_column_name); - bool rhs_argument_is_column = columns_names.contains(rhs_argument_column_name); - - bool lhs_argument_is_constant = lhs_argument.isConstant(); - bool rhs_argument_is_constant = rhs_argument.isConstant(); - - RPNBuilderTreeNode * constant_node = nullptr; - - if (lhs_argument_is_column && rhs_argument_is_constant) - constant_node = &rhs_argument; - else if (lhs_argument_is_constant && rhs_argument_is_column) - constant_node = &lhs_argument; - else - return false; - - Field output_value; - DataTypePtr output_type; - if (!constant_node->tryGetConstant(output_value, output_type)) - return false; - - const auto type = output_value.getType(); - - /// check the value with respect to threshold - if (type == Field::Types::UInt64) - { - const auto value = output_value.get(); - return value > threshold; - } - else if (type == Field::Types::Int64) - { - const auto value = output_value.get(); - return value < -threshold || threshold < value; - } - else if (type == Field::Types::Float64) - { - const auto value = output_value.get(); - return value < threshold || threshold < value; - } - - return false; -} +/// static bool isConditionGood(const RPNBuilderTreeNode & condition, const NameSet & columns_names) +/// { +/// if (!condition.isFunction()) +/// return false; +/// +/// auto function_node = condition.toFunctionNode(); +/// +/// /** We are only considering conditions of form `equals(one, another)` or `one = another`, +/// * especially if either `one` or `another` is ASTIdentifier +/// */ +/// if (function_node.getFunctionName() != "equals" || function_node.getArgumentsSize() != 2) +/// return false; +/// +/// auto lhs_argument = function_node.getArgumentAt(0); +/// auto rhs_argument = function_node.getArgumentAt(1); +/// +/// auto lhs_argument_column_name = lhs_argument.getColumnName(); +/// auto rhs_argument_column_name = rhs_argument.getColumnName(); +/// +/// bool lhs_argument_is_column = columns_names.contains(lhs_argument_column_name); +/// bool rhs_argument_is_column = columns_names.contains(rhs_argument_column_name); +/// +/// bool lhs_argument_is_constant = lhs_argument.isConstant(); +/// bool rhs_argument_is_constant = rhs_argument.isConstant(); +/// +/// RPNBuilderTreeNode * constant_node = nullptr; +/// +/// if (lhs_argument_is_column && rhs_argument_is_constant) +/// constant_node = &rhs_argument; +/// else if (lhs_argument_is_constant && rhs_argument_is_column) +/// constant_node = &lhs_argument; +/// else +/// return false; +/// +/// Field output_value; +/// DataTypePtr output_type; +/// if (!constant_node->tryGetConstant(output_value, output_type)) +/// return false; +/// +/// const auto type = output_value.getType(); +/// +/// /// check the value with respect to threshold +/// if (type == Field::Types::UInt64) +/// { +/// const auto value = output_value.get(); +/// return value > threshold; +/// } +/// else if (type == Field::Types::Int64) +/// { +/// const auto value = output_value.get(); +/// return value < -threshold || threshold < value; +/// } +/// else if (type == Field::Types::Float64) +/// { +/// const auto value = output_value.get(); +/// return value < threshold || threshold < value; +/// } +/// +/// return false; +/// } void MergeTreeWhereOptimizer::analyzeImpl(Conditions & res, const RPNBuilderTreeNode & node, const WhereOptimizerContext & where_optimizer_context) const { @@ -229,7 +231,10 @@ void MergeTreeWhereOptimizer::analyzeImpl(Conditions & res, const RPNBuilderTree && cond.table_columns.size() < queried_columns.size(); if (cond.viable) - cond.good = isConditionGood(node, table_columns); + cond.selectivity = estimator.estimateSelectivity(node); + + ///if (cond.viable) + /// cond.good = isConditionGood(node, table_columns); res.emplace_back(std::move(cond)); } diff --git a/src/Storages/MergeTree/MergeTreeWhereOptimizer.h b/src/Storages/MergeTree/MergeTreeWhereOptimizer.h index 18555a72db1..6985237a7c6 100644 --- a/src/Storages/MergeTree/MergeTreeWhereOptimizer.h +++ b/src/Storages/MergeTree/MergeTreeWhereOptimizer.h @@ -6,6 +6,7 @@ #include #include +#include "Storages/Statistic/Statistic.h" #include #include @@ -37,6 +38,7 @@ public: MergeTreeWhereOptimizer( std::unordered_map column_sizes_, const StorageMetadataPtr & metadata_snapshot, + const ConditionEstimator & estimator_, const Names & queried_columns_, const std::optional & supported_columns_, Poco::Logger * log_); @@ -69,12 +71,12 @@ private: /// Can condition be moved to prewhere? bool viable = false; - /// Does the condition presumably have good selectivity? - bool good = false; + /// the lower the better + Float64 selectivity = 0; auto tuple() const { - return std::make_tuple(!viable, !good, columns_size, table_columns.size()); + return std::make_tuple(!viable, selectivity, columns_size, table_columns.size()); } /// Is condition a better candidate for moving to PREWHERE? @@ -137,6 +139,8 @@ private: static NameSet determineArrayJoinedNames(const ASTSelectQuery & select); + const ConditionEstimator estimator; + const NameSet table_columns; const Names queried_columns; const std::optional supported_columns; diff --git a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp index 39c4157a42e..a3e0f6bf77b 100644 --- a/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp +++ b/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp @@ -198,7 +198,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore( part, metadata_snapshot, block.getNamesAndTypesList(), - {}, + {}, {}, CompressionCodecFactory::instance().get("NONE", {}), NO_TRANSACTION_PTR); diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index bfd9e92b4eb..f4bff1cd42c 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -19,6 +19,7 @@ MergedBlockOutputStream::MergedBlockOutputStream( const StorageMetadataPtr & metadata_snapshot_, const NamesAndTypesList & columns_list_, const MergeTreeIndices & skip_indices, + const Statistics & statistics, CompressionCodecPtr default_codec_, const MergeTreeTransactionPtr & txn, bool reset_columns_, @@ -47,7 +48,7 @@ MergedBlockOutputStream::MergedBlockOutputStream( data_part->version.setCreationTID(tid, nullptr); data_part->storeVersionMetadata(); - writer = data_part->getWriter(columns_list, metadata_snapshot, skip_indices, default_codec, writer_settings, {}); + writer = data_part->getWriter(columns_list, metadata_snapshot, skip_indices, statistics, default_codec, writer_settings, {}); } /// If data is pre-sorted. diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.h b/src/Storages/MergeTree/MergedBlockOutputStream.h index 20e6de5a99b..48eca3e71f6 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.h +++ b/src/Storages/MergeTree/MergedBlockOutputStream.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace DB @@ -19,6 +20,7 @@ public: const StorageMetadataPtr & metadata_snapshot_, const NamesAndTypesList & columns_list_, const MergeTreeIndices & skip_indices, + const Statistics & statistics, CompressionCodecPtr default_codec_, const MergeTreeTransactionPtr & txn, bool reset_columns_ = false, diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp index 3b2eb96f2d4..492a573a738 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.cpp @@ -16,6 +16,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( const Block & header_, CompressionCodecPtr default_codec, const MergeTreeIndices & indices_to_recalc, + const Statistics & stats_to_recalc_, WrittenOffsetColumns * offset_columns_, const MergeTreeIndexGranularity & index_granularity, const MergeTreeIndexGranularityInfo * index_granularity_info) @@ -36,6 +37,7 @@ MergedColumnOnlyOutputStream::MergedColumnOnlyOutputStream( header.getNamesAndTypesList(), metadata_snapshot_, indices_to_recalc, + stats_to_recalc_, default_codec, writer_settings, index_granularity); diff --git a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h index f382b0fef60..1a2c56a4f7b 100644 --- a/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h +++ b/src/Storages/MergeTree/MergedColumnOnlyOutputStream.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB { @@ -19,6 +20,7 @@ public: const Block & header_, CompressionCodecPtr default_codec_, const MergeTreeIndices & indices_to_recalc_, + const Statistics & stats_to_recalc_, WrittenOffsetColumns * offset_columns_ = nullptr, const MergeTreeIndexGranularity & index_granularity = {}, const MergeTreeIndexGranularityInfo * index_granularity_info_ = nullptr); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index b98b0844ee7..d16fde50f0a 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -71,6 +72,7 @@ static void splitAndModifyMutationCommands( for (const auto & command : commands) { if (command.type == MutationCommand::Type::MATERIALIZE_INDEX + || command.type == MutationCommand::Type::MATERIALIZE_STATISTIC || command.type == MutationCommand::Type::MATERIALIZE_COLUMN || command.type == MutationCommand::Type::MATERIALIZE_PROJECTION || command.type == MutationCommand::Type::MATERIALIZE_TTL @@ -190,6 +192,7 @@ static void splitAndModifyMutationCommands( { if (command.type == MutationCommand::Type::MATERIALIZE_INDEX || command.type == MutationCommand::Type::MATERIALIZE_COLUMN + || command.type == MutationCommand::Type::MATERIALIZE_STATISTIC || command.type == MutationCommand::Type::MATERIALIZE_PROJECTION || command.type == MutationCommand::Type::MATERIALIZE_TTL || command.type == MutationCommand::Type::DELETE @@ -437,6 +440,20 @@ static ExecuteTTLType shouldExecuteTTL(const StorageMetadataPtr & metadata_snaps return has_ttl_expression ? ExecuteTTLType::RECALCULATE : ExecuteTTLType::NONE; } +static std::set getStatisticsToRecalculate(const StorageMetadataPtr & metadata_snapshot, const NameSet & materialized_stats) +{ + const auto & stats_factory = MergeTreeStatisticFactory::instance(); + std::set stats_to_recalc; + const auto & stats = metadata_snapshot->getStatistics(); + for (const auto & stat_desc : stats) + { + if (materialized_stats.contains(stat_desc.name)) + { + stats_to_recalc.insert(stats_factory.get(stat_desc)); + } + } + return stats_to_recalc; +} /// Return set of indices which should be recalculated during mutation also /// wraps input stream into additional expression stream @@ -527,7 +544,8 @@ static NameSet collectFilesToSkip( const Block & updated_header, const std::set & indices_to_recalc, const String & mrk_extension, - const std::set & projections_to_recalc) + const std::set & projections_to_recalc, + const std::set & stats_to_recalc) { NameSet files_to_skip = source_part->getFileNamesWithoutChecksums(); @@ -544,6 +562,9 @@ static NameSet collectFilesToSkip( for (const auto & projection : projections_to_recalc) files_to_skip.insert(projection->getDirectoryName()); + for (const auto & stat : stats_to_recalc) + files_to_skip.insert(stat->getFileName() + STAT_FILE_SUFFIX); + if (isWidePart(source_part)) { auto new_stream_counts = getStreamCounts(new_part, new_part->getColumns().getNames()); @@ -620,6 +641,11 @@ static NameToNameVector collectFilesForRenames( if (source_part->checksums.has(command.column_name + ".proj")) add_rename(command.column_name + ".proj", ""); } + //else if (command.type == MutationCommand::Type::DROP_STATISTICS) + //{ + // if (source_part->checksums.has(command.column_name + ".stat")) + // add_rename(command.column_name + ".stat", ""); + //} else if (isWidePart(source_part)) { if (command.type == MutationCommand::Type::DROP_COLUMN) @@ -830,6 +856,7 @@ struct MutationContext NamesAndTypesList storage_columns; NameSet materialized_indices; NameSet materialized_projections; + NameSet materialized_statistics; MergeTreeData::MutableDataPartPtr new_data_part; IMergedBlockOutputStreamPtr out{nullptr}; @@ -840,6 +867,7 @@ struct MutationContext IMergeTreeDataPart::MinMaxIndexPtr minmax_idx{nullptr}; std::set indices_to_recalc; + std::set stats_to_recalc; std::set projections_to_recalc; MergeTreeData::DataPart::Checksums existing_indices_checksums; NameSet files_to_skip; @@ -1292,6 +1320,30 @@ private: } } + Statistics stats; + const auto & statistics = ctx->metadata_snapshot->getStatistics(); + for (const auto & stat : statistics) + { + if (ctx->materialized_statistics.contains(stat.name)) + { + stats.push_back(MergeTreeStatisticFactory::instance().get(stat)); + } + else + { + auto prefix = fmt::format("{}{}.", STAT_FILE_PREFIX, stat.name); + auto it = ctx->source_part->checksums.files.upper_bound(prefix); + while (it != ctx->source_part->checksums.files.end()) + { + if (!startsWith(it->first, prefix)) + break; + + entries_to_hardlink.insert(it->first); + ctx->existing_indices_checksums.addFile(it->first, it->second.file_size, it->second.file_hash); + ++it; + } + } + } + NameSet removed_projections; for (const auto & command : ctx->for_file_renames) { @@ -1376,11 +1428,13 @@ private: ctx->minmax_idx = std::make_shared(); + LOG_TRACE(ctx->log, "going to write {} stats", stats.size()); ctx->out = std::make_shared( ctx->new_data_part, ctx->metadata_snapshot, ctx->new_data_part->getColumns(), skip_indices, + stats, ctx->compression_codec, ctx->txn, /*reset_columns=*/ true, @@ -1575,6 +1629,7 @@ private: ctx->updated_header, ctx->compression_codec, std::vector(ctx->indices_to_recalc.begin(), ctx->indices_to_recalc.end()), + Statistics(ctx->stats_to_recalc.begin(), ctx->stats_to_recalc.end()), nullptr, ctx->source_part->index_granularity, &ctx->source_part->index_granularity_info @@ -1840,6 +1895,8 @@ bool MutateTask::prepare() ctx->metadata_snapshot->getColumns().getNamesOfPhysical(), context_for_reading, settings); ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices(); + ctx->materialized_statistics = ctx->interpreter->grabMaterializedStatistics(); + LOG_INFO(ctx->log, "stats number {}", ctx->materialized_statistics.size()); ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections(); ctx->mutating_pipeline_builder = ctx->interpreter->execute(); ctx->updated_header = ctx->interpreter->getUpdatedHeader(); @@ -1904,13 +1961,16 @@ bool MutateTask::prepare() ctx->projections_to_recalc = MutationHelpers::getProjectionsToRecalculate(ctx->metadata_snapshot, ctx->materialized_projections); + ctx->stats_to_recalc = MutationHelpers::getStatisticsToRecalculate(ctx->metadata_snapshot, ctx->materialized_statistics); + ctx->files_to_skip = MutationHelpers::collectFilesToSkip( ctx->source_part, ctx->new_data_part, ctx->updated_header, ctx->indices_to_recalc, ctx->mrk_extension, - ctx->projections_to_recalc); + ctx->projections_to_recalc, + ctx->stats_to_recalc); ctx->files_to_rename = MutationHelpers::collectFilesForRenames( ctx->source_part, diff --git a/src/Storages/MergeTree/registerStorageMergeTree.cpp b/src/Storages/MergeTree/registerStorageMergeTree.cpp index 75f1542e30e..b93604bcac6 100644 --- a/src/Storages/MergeTree/registerStorageMergeTree.cpp +++ b/src/Storages/MergeTree/registerStorageMergeTree.cpp @@ -579,6 +579,11 @@ static StoragePtr create(const StorageFactory::Arguments & args) for (auto & index : args.query.columns_list->indices->children) metadata.secondary_indices.push_back(IndexDescription::getIndexFromAST(index, columns, context)); + if (args.query.columns_list && args.query.columns_list->stats) + for (const auto & stat : args.query.columns_list->stats->children) + metadata.statistics.push_back( + StatisticDescription::getStatisticFromAST(stat, columns, args.getContext())); + if (args.query.columns_list && args.query.columns_list->projections) for (auto & projection_ast : args.query.columns_list->projections->children) { diff --git a/src/Storages/MutationCommands.cpp b/src/Storages/MutationCommands.cpp index 6eb345b449e..b00dca95c56 100644 --- a/src/Storages/MutationCommands.cpp +++ b/src/Storages/MutationCommands.cpp @@ -10,6 +10,7 @@ #include #include #include +#include "Parsers/ASTAlterQuery.h" #include #include @@ -68,6 +69,16 @@ std::optional MutationCommand::parse(ASTAlterCommand * command, res.index_name = command->index->as().name(); return res; } + else if (command->type == ASTAlterCommand::MATERIALIZE_STATISTIC) + { + MutationCommand res; + res.ast = command->ptr(); + res.type = MATERIALIZE_STATISTIC; + res.partition = command->partition; + res.predicate = nullptr; + res.statistic_name = command->statistic->as().name(); + return res; + } else if (command->type == ASTAlterCommand::MATERIALIZE_PROJECTION) { MutationCommand res; diff --git a/src/Storages/MutationCommands.h b/src/Storages/MutationCommands.h index 5ef0cfda1be..c9fa59bc309 100644 --- a/src/Storages/MutationCommands.h +++ b/src/Storages/MutationCommands.h @@ -30,10 +30,12 @@ struct MutationCommand UPDATE, MATERIALIZE_INDEX, MATERIALIZE_PROJECTION, + MATERIALIZE_STATISTIC, READ_COLUMN, /// Read column and apply conversions (MODIFY COLUMN alter query). DROP_COLUMN, DROP_INDEX, DROP_PROJECTION, + DROP_STATISTIC, MATERIALIZE_TTL, RENAME_COLUMN, MATERIALIZE_COLUMN, @@ -48,9 +50,10 @@ struct MutationCommand /// Columns with corresponding actions std::unordered_map column_to_update_expression; - /// For MATERIALIZE INDEX and PROJECTION + /// For MATERIALIZE INDEX and PROJECTION and STATISTIC String index_name; String projection_name; + String statistic_name; /// For MATERIALIZE INDEX, UPDATE and DELETE. ASTPtr partition; diff --git a/src/Storages/Statistic/Statistic.cpp b/src/Storages/Statistic/Statistic.cpp new file mode 100644 index 00000000000..eaf7d828e1f --- /dev/null +++ b/src/Storages/Statistic/Statistic.cpp @@ -0,0 +1,155 @@ +#include +#include +#include +#include +#include "Storages/MergeTree/RPNBuilder.h" + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; + extern const int INCORRECT_QUERY; +} + + +std::optional ConditionEstimator::extractSingleColumn(const RPNBuilderTreeNode & node) const +{ + if (node.isConstant()) + { + return std::nullopt; + } + + if (!node.isFunction()) + { + auto column_name = node.getColumnName(); + return {column_name}; + } + + auto function_node = node.toFunctionNode(); + size_t arguments_size = function_node.getArgumentsSize(); + std::optional result; + for (size_t i = 0; i < arguments_size; ++i) + { + auto function_argument = function_node.getArgumentAt(i); + auto subresult = extractSingleColumn(function_argument); + if (subresult == std::nullopt) + continue; + else if (subresult == "") + return ""; + else if (result == std::nullopt) + result = subresult; + else if (result.value() != subresult.value()) + return ""; + } + return result; +} + +std::pair ConditionEstimator::extractBinaryOp(const RPNBuilderTreeNode & node, const std::string & column_name) const +{ + if (!node.isFunction()) + return {}; + + auto function_node = node.toFunctionNode(); + if (function_node.getArgumentsSize() != 2) + return {}; + + std::string function_name = function_node.getFunctionName(); + + auto lhs_argument = function_node.getArgumentAt(0); + auto rhs_argument = function_node.getArgumentAt(1); + + auto lhs_argument_column_name = lhs_argument.getColumnName(); + auto rhs_argument_column_name = rhs_argument.getColumnName(); + + bool lhs_argument_is_column = column_name == (lhs_argument_column_name); + bool rhs_argument_is_column = column_name == (rhs_argument_column_name); + + bool lhs_argument_is_constant = lhs_argument.isConstant(); + bool rhs_argument_is_constant = rhs_argument.isConstant(); + + RPNBuilderTreeNode * constant_node = nullptr; + + if (lhs_argument_is_column && rhs_argument_is_constant) + constant_node = &rhs_argument; + else if (lhs_argument_is_constant && rhs_argument_is_column) + constant_node = &lhs_argument; + else + return {}; + + Field output_value; + DataTypePtr output_type; + if (!constant_node->tryGetConstant(output_value, output_type)) + return {}; + + const auto type = output_value.getType(); + Float64 value; + if (type == Field::Types::Int64) + value = output_value.get(); + else if (type == Field::Types::UInt64) + value = output_value.get(); + else if (type == Field::Types::Float64) + value = output_value.get(); + return std::make_pair(function_name, value); +} + +StatisticPtr TDigestCreator(const StatisticDescription & stat) +{ + if (stat.column_names.size() != 1) + { + /// throw + } + + /// TODO: check column data types. + return StatisticPtr(new TDigestStatistic(stat)); +} + +void MergeTreeStatisticFactory::registerCreator(const std::string & stat_type, Creator creator) +{ + if (!creators.emplace(stat_type, std::move(creator)).second) + throw Exception(ErrorCodes::LOGICAL_ERROR, "MergeTreeStatisticFactory: the statistic creator type {} is not unique", stat_type); +} + +MergeTreeStatisticFactory::MergeTreeStatisticFactory() +{ + registerCreator("t_digest", TDigestCreator); + + ///registerCreator("cm_sketch", CMSketchCreator); +} + +MergeTreeStatisticFactory & MergeTreeStatisticFactory::instance() +{ + static MergeTreeStatisticFactory instance; + return instance; +} + +StatisticPtr MergeTreeStatisticFactory::get(const StatisticDescription & stat) const +{ + auto it = creators.find(stat.type); + if (it == creators.end()) + { + throw Exception(ErrorCodes::INCORRECT_QUERY, + "Unknown Statistic type '{}'. Available types: {}", stat.type, + std::accumulate(creators.cbegin(), creators.cend(), std::string{}, + [] (auto && left, const auto & right) -> std::string + { + if (left.empty()) + return right.first; + else + return left + ", " + right.first; + }) + ); + } + return std::make_shared(stat); +} + +Statistics MergeTreeStatisticFactory::getMany(const std::vector & stats) const +{ + Statistics result; + for (const auto & stat : stats) + result.push_back(get(stat)); + return result; +} + +} diff --git a/src/Storages/Statistic/Statistic.h b/src/Storages/Statistic/Statistic.h new file mode 100644 index 00000000000..2d16ef90ebe --- /dev/null +++ b/src/Storages/Statistic/Statistic.h @@ -0,0 +1,262 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include "Common/Exception.h" +#include +#include "Storages/MergeTree/RPNBuilder.h" + +#include + +/// this is for user-defined statistic. +/// For auto collected statisic, we can use 'auto_statistic_' +constexpr auto STAT_FILE_PREFIX = "statistic_"; +constexpr auto STAT_FILE_SUFFIX = ".stat"; + +namespace DB +{ + +class IStatistic; +using StatisticPtr = std::shared_ptr; +using Statistics = std::vector; + +class IStatistic +{ +public: + explicit IStatistic(const StatisticDescription & stat_) + : statistics(stat_) + { + } + virtual ~IStatistic() = default; + + String getFileName() const + { + return STAT_FILE_PREFIX + name(); + } + + const String & name() const + { + return statistics.name; + } + + const String & columnName() const + { + return statistics.column_names[0]; + } + /// const String& type() const = 0; + /// virtual StatisticType statisticType() const = 0; + + virtual void serialize(WriteBuffer & buf) = 0; + virtual void deserialize(ReadBuffer & buf) = 0; + virtual void update(const Block & block) = 0; + virtual UInt64 count() = 0; + +protected: + + const StatisticDescription & statistics; + +}; + +class TDigestStatistic : public IStatistic +{ + QuantileTDigest data; +public: + explicit TDigestStatistic(const StatisticDescription & stat) : IStatistic(stat) + { + } + + struct Range + { + Float64 left, right; + }; + + /// FIXME: implement correct count estimate method. + Float64 estimateLess(Float64 val) const + { + return data.getCountLessThan(val); + } + + void serialize(WriteBuffer & buf) override + { + data.serialize(buf); + LOG_DEBUG(&Poco::Logger::get("t-digest"), "serialize into {} data", buf.offset()); + } + + void deserialize(ReadBuffer & buf) override + { + data.deserialize(buf); + } + + void update(const Block & block) override + { + const auto & column_with_type = block.getByName(statistics.column_names[0]); + size_t size = block.rows(); + + for (size_t i = 0; i < size; ++i) + { + /// TODO: support more types. + Float64 value = column_with_type.column->getFloat64(i); + data.add(value, 1); + } + + LOG_DEBUG(&Poco::Logger::get("t-digest"), "write into {} data", size); + } + + UInt64 count() override + { + return static_cast(data.count); + } +}; + +class MergeTreeStatisticFactory : private boost::noncopyable +{ +public: + static MergeTreeStatisticFactory & instance(); + + using Creator = std::function; + + StatisticPtr get(const StatisticDescription & stat) const; + + Statistics getMany(const std::vector & stats) const; + + void registerCreator(const std::string & type, Creator creator); + +protected: + MergeTreeStatisticFactory(); + +private: + using Creators = std::unordered_map; + Creators creators; +}; + +class ConditionEstimator +{ +private: + + static constexpr auto default_good_cond_factor = 0.1; + static constexpr auto default_normal_cond_factor = 0.5; + static constexpr auto default_unknown_cond_factor = 1.0; + /// Conditions like "x = N" are considered good if abs(N) > threshold. + /// This is used to assume that condition is likely to have good selectivity. + static constexpr auto threshold = 2; + + UInt64 total_count; + + struct PartColumnEstimator + { + UInt64 part_count; + + std::shared_ptr t_digest; + + void merge(StatisticPtr statistic) + { + UInt64 cur_part_count = statistic->count(); + if (part_count == 0) + part_count = cur_part_count; + + if (typeid_cast(statistic.get())) + { + t_digest = std::static_pointer_cast(statistic); + } + } + + Float64 estimateLess(Float64 val) const + { + if (t_digest != nullptr) + return t_digest -> estimateLess(val); + return part_count * default_normal_cond_factor; + } + + Float64 estimateGreator(Float64 val) const + { + if (t_digest != nullptr) + return part_count - t_digest -> estimateLess(val); + return part_count * default_normal_cond_factor; + } + }; + + struct ColumnEstimator + { + std::map estimators; + + void merge(std::string part_name, StatisticPtr statistic) + { + estimators[part_name].merge(statistic); + } + Float64 estimateLess(Float64 val) const + { + if (estimators.empty()) + return default_normal_cond_factor; + Float64 result = 0; + for (const auto & [key, estimator] : estimators) + result += estimator.estimateLess(val); + return result; + } + + Float64 estimateGreater(Float64 val) const + { + if (estimators.empty()) + return default_normal_cond_factor; + Float64 result = 0; + for (const auto & [key, estimator] : estimators) + result += estimator.estimateGreator(val); + return result; + } + }; + + std::map column_estimators; + std::optional extractSingleColumn(const RPNBuilderTreeNode & node) const; + std::pair extractBinaryOp(const RPNBuilderTreeNode & node, const std::string & column_name) const; + +public: + + ConditionEstimator() = default; + + /// TODO: Support the condition consists of CNF/DNF like (cond1 and cond2) or (cond3) ... + /// Right now we only support simple condition like col = val / col < val + Float64 estimateSelectivity(const RPNBuilderTreeNode & node) const + { + auto col = extractSingleColumn(node); + if (col == std::nullopt || col == "") + { + return default_unknown_cond_factor; + } + auto it = column_estimators.find(col.value()); + ColumnEstimator estimator; + if (it != column_estimators.end()) + { + estimator = it->second; + } + auto [op, val] = extractBinaryOp(node, col.value()); + if (op == "equals") + { + if (val < - threshold || val > threshold) + return default_normal_cond_factor; + else + return default_good_cond_factor; + } + else if (op == "less" || op == "lessThan") + { + return estimator.estimateLess(val) / total_count; + } + else if (op == "greater" || op == "greaterThan") + { + return estimator.estimateLess(val) / total_count; + } + else + return default_unknown_cond_factor; + } + void merge(std::string part_name, StatisticPtr statistic) + { + column_estimators[statistic->columnName()].merge(part_name, statistic); + } + +}; + + +} diff --git a/src/Storages/StatisticsDescription.cpp b/src/Storages/StatisticsDescription.cpp new file mode 100644 index 00000000000..6c7e1244fcd --- /dev/null +++ b/src/Storages/StatisticsDescription.cpp @@ -0,0 +1,120 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int INCORRECT_QUERY; + extern const int LOGICAL_ERROR; +}; + +StatisticDescription StatisticDescription::getStatisticFromAST(const ASTPtr & definition_ast, const ColumnsDescription & columns, ContextPtr context) +{ + const auto * stat_definition = definition_ast->as(); + if (!stat_definition) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot create statistic from non ASTStatisticDeclaration AST"); + + if (stat_definition->name.empty()) + throw Exception(ErrorCodes::INCORRECT_QUERY, "Statistic must have name in definition."); + + // type == nullptr => auto + if (!stat_definition->type) + throw Exception(ErrorCodes::INCORRECT_QUERY, "TYPE is required for statistics"); + + if (stat_definition->type->parameters && !stat_definition->type->parameters->children.empty()) + throw Exception(ErrorCodes::INCORRECT_QUERY, "Statistics type cannot have parameters"); + + StatisticDescription stat; + stat.definition_ast = definition_ast->clone(); + stat.name = stat_definition->name; + stat.type = Poco::toLower(stat_definition->type->name); + + ASTPtr expr_list = extractKeyExpressionList(stat_definition->columns->clone()); + for (const auto & ast : expr_list->children) + { + ASTIdentifier* ident = ast->as(); + if (!ident || !columns.hasPhysical(ident->getColumnName())) + throw Exception(ErrorCodes::INCORRECT_QUERY, "Incorrect column"); + const auto & column = columns.get(ident->getColumnName()); + stat.column_names.push_back(column.name); + stat.data_types.push_back(column.type); + } + + UNUSED(context); + + return stat; +} + +StatisticDescription::StatisticDescription(const StatisticDescription & other) + : definition_ast(other.definition_ast ? other.definition_ast->clone() : nullptr) + , name(other.name) + , type(other.type) + , column_names(other.column_names) +{ +} + +StatisticDescription & StatisticDescription::operator=(const StatisticDescription & other) +{ + if (&other == this) + return *this; + + if (other.definition_ast) + definition_ast = other.definition_ast->clone(); + else + definition_ast.reset(); + + name = other.name; + type = other.type; + column_names = other.column_names; + + return *this; +} + + +bool StatisticsDescriptions::has(const String & name) const +{ + for (const auto & statistic : *this) + if (statistic.name == name) + return true; + return false; +} + +String StatisticsDescriptions::toString() const +{ + if (empty()) + return {}; + + ASTExpressionList list; + for (const auto & statistic : *this) + list.children.push_back(statistic.definition_ast); + + return serializeAST(list, true); +} + +StatisticsDescriptions StatisticsDescriptions::parse(const String & str, const ColumnsDescription & columns, ContextPtr context) +{ + StatisticsDescriptions result; + if (str.empty()) + return result; + + ParserStatisticDeclaration parser; + ASTPtr list = parseQuery(parser, str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH); + + for (const auto & index : list->children) + result.emplace_back(StatisticDescription::getStatisticFromAST(index, columns, context)); + + return result; +} + +} diff --git a/src/Storages/StatisticsDescription.h b/src/Storages/StatisticsDescription.h new file mode 100644 index 00000000000..2cbce381990 --- /dev/null +++ b/src/Storages/StatisticsDescription.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include + +namespace DB +{ + +struct StatisticDescription +{ + /// Definition AST of statistic + ASTPtr definition_ast; + + /// Statistic name + String name; + + String type; + + /// Names of statistic columns + Names column_names; + + /// Data types of statistic columns + DataTypes data_types; + + static StatisticDescription getStatisticFromAST(const ASTPtr & definition_ast, const ColumnsDescription & columns, ContextPtr context); + + StatisticDescription() = default; + + /// We need custom copy constructors because we don't want + /// unintentionaly share AST variables and modify them. + StatisticDescription(const StatisticDescription & other); + StatisticDescription & operator=(const StatisticDescription & other); +}; + +struct StatisticsDescriptions : public std::vector +{ + /// Stat with name exists + bool has(const String & name) const; + /// Convert description to string + String toString() const; + /// Parse description from string + static StatisticsDescriptions parse(const String & str, const ColumnsDescription & columns, ContextPtr context); +}; + +} diff --git a/src/Storages/StorageInMemoryMetadata.cpp b/src/Storages/StorageInMemoryMetadata.cpp index afe75349864..4546d9a8bda 100644 --- a/src/Storages/StorageInMemoryMetadata.cpp +++ b/src/Storages/StorageInMemoryMetadata.cpp @@ -4,6 +4,7 @@ #include #include #include +#include "Storages/StatisticsDescription.h" #include #include #include @@ -28,6 +29,7 @@ namespace ErrorCodes StorageInMemoryMetadata::StorageInMemoryMetadata(const StorageInMemoryMetadata & other) : columns(other.columns) , secondary_indices(other.secondary_indices) + , statistics(other.statistics) , constraints(other.constraints) , projections(other.projections.clone()) , minmax_count_projection( @@ -52,6 +54,7 @@ StorageInMemoryMetadata & StorageInMemoryMetadata::operator=(const StorageInMemo columns = other.columns; secondary_indices = other.secondary_indices; + statistics = other.statistics; constraints = other.constraints; projections = other.projections.clone(); if (other.minmax_count_projection) @@ -91,6 +94,11 @@ void StorageInMemoryMetadata::setSecondaryIndices(IndicesDescription secondary_i secondary_indices = std::move(secondary_indices_); } +void StorageInMemoryMetadata::setStatistics(StatisticsDescriptions statistics_) +{ + statistics = std::move(statistics_); +} + void StorageInMemoryMetadata::setConstraints(ConstraintsDescription constraints_) { constraints = std::move(constraints_); @@ -146,6 +154,11 @@ const IndicesDescription & StorageInMemoryMetadata::getSecondaryIndices() const return secondary_indices; } +const StatisticsDescriptions & StorageInMemoryMetadata::getStatistics() const +{ + return statistics; +} + bool StorageInMemoryMetadata::hasSecondaryIndices() const { return !secondary_indices.empty(); diff --git a/src/Storages/StorageInMemoryMetadata.h b/src/Storages/StorageInMemoryMetadata.h index 4ed7eb8bf29..761788949fb 100644 --- a/src/Storages/StorageInMemoryMetadata.h +++ b/src/Storages/StorageInMemoryMetadata.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -24,6 +25,8 @@ struct StorageInMemoryMetadata ColumnsDescription columns; /// Table indices. Currently supported for MergeTree only. IndicesDescription secondary_indices; + + StatisticsDescriptions statistics; /// Table constraints. Currently supported for MergeTree only. ConstraintsDescription constraints; /// Table projections. Currently supported for MergeTree only. @@ -75,6 +78,9 @@ struct StorageInMemoryMetadata /// Sets secondary indices void setSecondaryIndices(IndicesDescription secondary_indices_); + /// Sets statistics + void setStatistics(StatisticsDescriptions statistics_); + /// Sets constraints void setConstraints(ConstraintsDescription constraints_); @@ -105,6 +111,8 @@ struct StorageInMemoryMetadata /// Returns secondary indices const IndicesDescription & getSecondaryIndices() const; + const StatisticsDescriptions & getStatistics() const; + /// Has at least one non primary index bool hasSecondaryIndices() const; From a0e0a3a522562ac49718fbd7e3cb78d5fad3e45a Mon Sep 17 00:00:00 2001 From: yariks5s Date: Fri, 11 Aug 2023 12:24:44 +0000 Subject: [PATCH 003/654] created groupSortedArray by ..Moving --- .../AggregateFunctionGroupArraySorted.cpp | 115 +++++++++ .../AggregateFunctionGroupArraySorted.h | 222 ++++++++++++++++++ .../registerAggregateFunctions.cpp | 2 + 3 files changed, 339 insertions(+) create mode 100644 src/AggregateFunctions/AggregateFunctionGroupArraySorted.cpp create mode 100644 src/AggregateFunctions/AggregateFunctionGroupArraySorted.h diff --git a/src/AggregateFunctions/AggregateFunctionGroupArraySorted.cpp b/src/AggregateFunctions/AggregateFunctionGroupArraySorted.cpp new file mode 100644 index 00000000000..929a4f21351 --- /dev/null +++ b/src/AggregateFunctions/AggregateFunctionGroupArraySorted.cpp @@ -0,0 +1,115 @@ +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ +struct Settings; + +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH; + extern const int BAD_ARGUMENTS; +} + +namespace +{ + +template +struct Sorted +{ + using Data = SortedData, + std::conditional_t, + NearestFieldType>>; + using Function = SortedImpl; +}; + +template using SortedTemplate = typename Sorted::Function; + +template