From 0056d04b9cf13e34fb3bb913c225f2a020cd6b36 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Tue, 3 Sep 2024 08:24:58 +0000 Subject: [PATCH 1/3] initial ver --- src/Disks/IStoragePolicy.h | 9 +- src/Disks/StoragePolicy.cpp | 16 ++ src/Disks/StoragePolicy.h | 4 + .../MergeTree/DataPartStorageOnDiskBase.cpp | 1 + src/Storages/MergeTree/IMergeTreeDataPart.cpp | 58 +++++ src/Storages/MergeTree/IMergeTreeDataPart.h | 15 ++ src/Storages/MergeTree/MergeTask.cpp | 8 + src/Storages/MergeTree/MergeTreeData.cpp | 3 + .../MergeTree/MergeTreePartsMover.cpp | 63 ++++-- src/Storages/MergeTree/MergeTreePartsMover.h | 9 +- .../MergeTree/MergedBlockOutputStream.cpp | 23 +- src/Storages/MergeTree/MutateTask.cpp | 3 + src/Storages/System/StorageSystemParts.cpp | 7 + .../System/StorageSystemStoragePolicies.cpp | 4 + .../test_insert_data_time/__init__.py | 0 .../test_insert_data_time/configs/cluster.xml | 16 ++ .../test_insert_data_time/configs/macro.xml | 5 + .../integration/test_insert_data_time/test.py | 200 ++++++++++++++++++ .../test_move_policy_jbod/__init__.py | 0 .../config.d/storage_configuration.xml | 57 +++++ .../configs/remote_servers.xml | 17 ++ .../integration/test_move_policy_jbod/test.py | 150 +++++++++++++ tests/integration/test_multiple_disks/test.py | 16 ++ 23 files changed, 665 insertions(+), 19 deletions(-) create mode 100644 tests/integration/test_insert_data_time/__init__.py create mode 100644 tests/integration/test_insert_data_time/configs/cluster.xml create mode 100644 tests/integration/test_insert_data_time/configs/macro.xml create mode 100644 tests/integration/test_insert_data_time/test.py create mode 100644 tests/integration/test_move_policy_jbod/__init__.py create mode 100644 tests/integration/test_move_policy_jbod/configs/config.d/storage_configuration.xml create mode 100644 tests/integration/test_move_policy_jbod/configs/remote_servers.xml create mode 100644 tests/integration/test_move_policy_jbod/test.py diff --git a/src/Disks/IStoragePolicy.h b/src/Disks/IStoragePolicy.h index a6a5fe5f692..7c1016d6e8c 100644 --- a/src/Disks/IStoragePolicy.h +++ b/src/Disks/IStoragePolicy.h @@ -68,7 +68,14 @@ public: /// Check if we have any volume with stopped merges virtual bool hasAnyVolumeWithDisabledMerges() const = 0; virtual bool containsVolume(const String & volume_name) const = 0; - /// Returns disks by type ordered by volumes priority + + enum class MovePolicy : uint8_t + { + BY_PART_SIZE, + BY_INSERT_DATA_TIME + }; + /// Returns policy of how to choose parts for move to the next volume. + virtual IStoragePolicy::MovePolicy getMovePolicy() const = 0; }; } diff --git a/src/Disks/StoragePolicy.cpp b/src/Disks/StoragePolicy.cpp index ccdc34d5d06..6da52dfe681 100644 --- a/src/Disks/StoragePolicy.cpp +++ b/src/Disks/StoragePolicy.cpp @@ -111,6 +111,22 @@ StoragePolicy::StoragePolicy( "Disk move factor have to be in [0., 1.] interval, but set to {} in storage policy {}", toString(move_factor), backQuote(name)); + auto move_policy_str = config.getString(config_prefix + ".move_policy", "by_part_size"); + if (move_policy_str == "by_part_size") + { + move_policy = IStoragePolicy::MovePolicy::BY_PART_SIZE; + } + else if (move_policy_str == "by_insert_data_time") + { + move_policy = IStoragePolicy::MovePolicy::BY_INSERT_DATA_TIME; + } + else + { + throw Exception( + ErrorCodes::INVALID_CONFIG_PARAMETER, + "Unknown values of move_policy parameter."); + } + buildVolumeIndices(); LOG_TRACE(log, "Storage policy {} created, total volumes {}", name, volumes.size()); } diff --git a/src/Disks/StoragePolicy.h b/src/Disks/StoragePolicy.h index 501e033abc3..e7982479f8f 100644 --- a/src/Disks/StoragePolicy.h +++ b/src/Disks/StoragePolicy.h @@ -92,6 +92,8 @@ public: bool containsVolume(const String & volume_name) const override; + IStoragePolicy::MovePolicy getMovePolicy() const override { return move_policy; } + private: Volumes volumes; const String name; @@ -103,6 +105,8 @@ private: /// filled more than total_size * move_factor double move_factor = 0.1; /// by default move factor is 10% + MovePolicy move_policy = MovePolicy::BY_PART_SIZE; + void buildVolumeIndices(); LoggerPtr log; diff --git a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp index df151e8478f..36e30833769 100644 --- a/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp +++ b/src/Storages/MergeTree/DataPartStorageOnDiskBase.cpp @@ -885,6 +885,7 @@ void DataPartStorageOnDiskBase::clearDirectory( request.emplace_back(fs::path(dir) / "delete-on-destroy.txt", true); request.emplace_back(fs::path(dir) / "txn_version.txt", true); request.emplace_back(fs::path(dir) / "metadata_version.txt", true); + request.emplace_back(fs::path(dir) / IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE, true); disk->removeSharedFiles(request, !can_remove_shared_data, names_not_to_remove); disk->removeDirectory(dir); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 3a44359b537..5c46c0b1b71 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -465,6 +465,31 @@ std::pair IMergeTreeDataPart::getMinMaxTime() const return {}; } +time_t IMergeTreeDataPart::getMinTimeOfDataInsertion() const +{ + if (min_time_of_data_insert.has_value()) + { + return *min_time_of_data_insert; + } + if (modification_time == static_cast(0)) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent state of the part {}: min_time_of_data_insert doesn't contains value and modification_time is zero.", name); + } + return modification_time; +} + +time_t IMergeTreeDataPart::getMaxTimeOfDataInsertion() const +{ + if (max_time_of_data_insert.has_value()) + { + return *max_time_of_data_insert; + } + if (modification_time == static_cast(0)) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Inconsistent state of the part {}: max_time_of_data_insert doesn't contains value and modification_time is zero.", name); + } + return modification_time; +} void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos, int32_t metadata_version_) { @@ -736,6 +761,7 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks checkConsistency(require_columns_checksums); loadDefaultCompressionCodec(); + loadInsertTimeInfo(); } catch (...) { @@ -1033,6 +1059,38 @@ void IMergeTreeDataPart::loadDefaultCompressionCodec() } } +void IMergeTreeDataPart::loadInsertTimeInfo() +{ + bool exists = metadata_manager->exists(MIN_MAX_TIME_OF_DATA_INSERT_FILE); + if (!exists) + { + min_time_of_data_insert = {}; + max_time_of_data_insert = {}; + return; + } + try + { + auto file_buf = metadata_manager->read(MIN_MAX_TIME_OF_DATA_INSERT_FILE); + /// Escape undefined behavior: + /// "The behavior is undefined if *this does not contain a value" + min_time_of_data_insert = static_cast(0); + max_time_of_data_insert = static_cast(0); + + tryReadText(*min_time_of_data_insert, *file_buf); + checkString(" ", *file_buf); + tryReadText(*max_time_of_data_insert, *file_buf); + } + catch (const DB::Exception & ex) + { + String path = fs::path(getDataPartStorage().getRelativePath()) / MIN_MAX_TIME_OF_DATA_INSERT_FILE; + LOG_WARNING(storage.log, "Cannot parse min/max time of data insert for part {} from file {}, error '{}'." + , name, path, ex.what()); + + min_time_of_data_insert = {}; + max_time_of_data_insert = {}; + } +} + template void IMergeTreeDataPart::writeMetadata(const String & filename, const WriteSettings & settings, Writer && writer) { diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index 85ef0472ce7..e972bf2822f 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -208,6 +208,12 @@ public: /// otherwise, if the partition key includes dateTime column (also a common case), this function will return min and max values for that column. std::pair getMinMaxTime() const; + /// Returns two timespamps with min/max time of when data was added in this part. + /// These values doesn't require the special partition key in part schema. + /// Just keeping for each part two variable and update them with inserts, merges and mutations. + time_t getMinTimeOfDataInsertion() const; + time_t getMaxTimeOfDataInsertion() const; + bool isEmpty() const { return rows_count == 0; } /// Compute part block id for zero level part. Otherwise throws an exception. @@ -235,6 +241,10 @@ public: std::optional existing_rows_count; time_t modification_time = 0; + + std::optional min_time_of_data_insert; + std::optional max_time_of_data_insert; + /// When the part is removed from the working set. Changes once. mutable std::atomic remove_time { std::numeric_limits::max() }; @@ -496,6 +506,9 @@ public: /// reference counter locally. static constexpr auto FILE_FOR_REFERENCES_CHECK = "checksums.txt"; + /// File with info about min/max time when data was added in the part. + static constexpr auto MIN_MAX_TIME_OF_DATA_INSERT_FILE = "min_max_time_of_data_insert.txt"; + /// Checks that all TTLs (table min/max, column ttls, so on) for part /// calculated. Part without calculated TTL may exist if TTL was added after /// part creation (using alter query with materialize_ttl setting). @@ -721,6 +734,8 @@ private: /// any specifial compression. void loadDefaultCompressionCodec(); + void loadInsertTimeInfo(); + void writeColumns(const NamesAndTypesList & columns_, const WriteSettings & settings); void writeVersionMetadata(const VersionMetadata & version_, bool fsync_part_dir) const; diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index fc64fae9a58..cac99a9c19f 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -266,6 +266,8 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() }; SerializationInfoByName infos(global_ctx->storage_columns, info_settings); + time_t min_insert_time_res = global_ctx->future_part->parts.front()->getMinTimeOfDataInsertion(); + time_t max_insert_time_res = global_ctx->future_part->parts.front()->getMaxTimeOfDataInsertion(); for (const auto & part : global_ctx->future_part->parts) { @@ -290,8 +292,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() infos.add(part_infos); } + + min_insert_time_res = std::min(min_insert_time_res, part->getMinTimeOfDataInsertion()); + max_insert_time_res = std::max(max_insert_time_res, part->getMaxTimeOfDataInsertion()); } + global_ctx->new_data_part->max_time_of_data_insert = max_insert_time_res; + global_ctx->new_data_part->min_time_of_data_insert = min_insert_time_res; + const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl; if (local_part_min_ttl && local_part_min_ttl <= global_ctx->time_of_merge) ctx->need_remove_expired_values = true; diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 49888596fbb..c8e45a61571 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -7412,6 +7412,9 @@ std::pair MergeTreeData::cloneAn dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true); dst_data_part->modification_time = dst_part_storage->getLastModified().epochTime(); + + // dst_data_part->min_time_of_data_insert = src_part->getMinTimeOfDataInsertion(); + // dst_data_part->max_time_of_data_insert = src_part->getMaxTimeOfDataInsertion(); return std::make_pair(dst_data_part, std::move(temporary_directory_lock)); } diff --git a/src/Storages/MergeTree/MergeTreePartsMover.cpp b/src/Storages/MergeTree/MergeTreePartsMover.cpp index 9223d6fd5b1..89b153d3e49 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.cpp +++ b/src/Storages/MergeTree/MergeTreePartsMover.cpp @@ -13,27 +13,38 @@ namespace ErrorCodes { extern const int ABORTED; extern const int DIRECTORY_ALREADY_EXISTS; + extern const int BAD_ARGUMENTS; } namespace { +struct PartsComparatorBySizeOnDisk +{ + bool operator()(const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const + { + /// If parts have equal sizes, than order them by names (names are unique) + UInt64 first_part_size = f->getBytesOnDisk(); + UInt64 second_part_size = s->getBytesOnDisk(); + return std::tie(first_part_size, f->name) < std::tie(second_part_size, s->name); + } +}; + +struct PartsComparatorByOldestData +{ + bool operator()(const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const + { + return std::forward_as_tuple(f->getMinTimeOfDataInsertion(), f->getMaxTimeOfDataInsertion()) > + std::forward_as_tuple(s->getMinTimeOfDataInsertion(), s->getMaxTimeOfDataInsertion()); + } +}; + /// Contains minimal number of heaviest parts, which sum size on disk is greater than required. /// If there are not enough summary size, than contains all parts. +template class LargestPartsWithRequiredSize { - struct PartsSizeOnDiskComparator - { - bool operator()(const MergeTreeData::DataPartPtr & f, const MergeTreeData::DataPartPtr & s) const - { - /// If parts have equal sizes, than order them by names (names are unique) - UInt64 first_part_size = f->getBytesOnDisk(); - UInt64 second_part_size = s->getBytesOnDisk(); - return std::tie(first_part_size, f->name) < std::tie(second_part_size, s->name); - } - }; - - std::set elems; + std::set elems; UInt64 required_size_sum; UInt64 current_size_sum = 0; @@ -50,7 +61,7 @@ public: } /// Adding smaller element - if (!elems.empty() && (*elems.begin())->getBytesOnDisk() >= part->getBytesOnDisk()) + if (!elems.empty() && PartsComparator()(part, *elems.begin())) return; elems.emplace(part); @@ -88,7 +99,8 @@ private: } -bool MergeTreePartsMover::selectPartsForMove( +template +bool MergeTreePartsMover::selectPartsForMoveImpl( MergeTreeMovingParts & parts_to_move, const AllowedMovingPredicate & can_move, const std::lock_guard & /* moving_parts_lock */) @@ -102,10 +114,9 @@ bool MergeTreePartsMover::selectPartsForMove( if (data_parts.empty()) return false; - std::unordered_map need_to_move; + std::unordered_map> need_to_move; const auto policy = data->getStoragePolicy(); const auto & volumes = policy->getVolumes(); - if (!volumes.empty()) { /// Do not check last volume @@ -209,6 +220,26 @@ bool MergeTreePartsMover::selectPartsForMove( return false; } +bool MergeTreePartsMover::selectPartsForMove( + MergeTreeMovingParts & parts_to_move, + const AllowedMovingPredicate & can_move, + const std::lock_guard & moving_parts_lock) +{ + IStoragePolicy::MovePolicy move_policy = data->getStoragePolicy()->getMovePolicy(); + if (move_policy == IStoragePolicy::MovePolicy::BY_PART_SIZE) + { + return selectPartsForMoveImpl(parts_to_move, can_move, moving_parts_lock); + } + else if (move_policy == IStoragePolicy::MovePolicy::BY_INSERT_DATA_TIME) + { + return selectPartsForMoveImpl(parts_to_move, can_move, moving_parts_lock); + } + else + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown move policy."); + } +} + MergeTreePartsMover::TemporaryClonedPart MergeTreePartsMover::clonePart(const MergeTreeMoveEntry & moving_part, const ReadSettings & read_settings, const WriteSettings & write_settings) const { auto cancellation_hook = [&moves_blocker_ = moves_blocker]() diff --git a/src/Storages/MergeTree/MergeTreePartsMover.h b/src/Storages/MergeTree/MergeTreePartsMover.h index 3cf270946d8..c00f0b970d5 100644 --- a/src/Storages/MergeTree/MergeTreePartsMover.h +++ b/src/Storages/MergeTree/MergeTreePartsMover.h @@ -4,8 +4,9 @@ #include #include #include -#include #include +#include +#include #include #include @@ -44,6 +45,12 @@ private: /// Callback tells that part is not participating in background process using AllowedMovingPredicate = std::function &, String * reason)>; + template + bool selectPartsForMoveImpl( + MergeTreeMovingParts & parts_to_move, + const AllowedMovingPredicate & can_move, + const std::lock_guard & moving_parts_lock); + public: explicit MergeTreePartsMover(MergeTreeData * data_) diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 4ee68580d3f..3a00c5cf431 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -195,11 +195,18 @@ MergedBlockOutputStream::Finalizer MergedBlockOutputStream::finalizePartAsync( } auto finalizer = std::make_unique(*writer, new_part, files_to_remove_after_sync, sync); + auto current_time = time(nullptr); + if (!new_part->min_time_of_data_insert.has_value() && !new_part->max_time_of_data_insert.has_value()) + { + new_part->min_time_of_data_insert = current_time; + new_part->max_time_of_data_insert = current_time; + } + new_part->modification_time = current_time; + if (new_part->isStoredOnDisk()) finalizer->written_files = finalizePartOnDisk(new_part, checksums); new_part->rows_count = rows_count; - new_part->modification_time = time(nullptr); new_part->setIndex(writer->releaseIndexColumns()); new_part->checksums = checksums; new_part->setBytesOnDisk(checksums.getTotalSizeOnDisk()); @@ -308,6 +315,20 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis written_files.emplace_back(std::move(out)); } + { + auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE, 4096, write_settings); + HashingWriteBuffer out_hashing(*out); + DB::writeIntText(*new_part->min_time_of_data_insert, out_hashing); + DB::writeText(" ", out_hashing); + DB::writeIntText(*new_part->max_time_of_data_insert, out_hashing); + out_hashing.finalize(); + checksums.files[IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE].file_size = out_hashing.count(); + checksums.files[IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE].file_hash = out_hashing.getHash(); + + out->preFinalize(); + written_files.emplace_back(std::move(out)); + } + { /// Write a file with a description of columns. auto out = new_part->getDataPartStorage().writeFile("columns.txt", 4096, write_settings); diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 9a775db73e2..b5684d4c96e 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -2236,6 +2236,9 @@ bool MutateTask::prepare() ctx->new_data_part->is_temp = true; ctx->new_data_part->ttl_infos = ctx->source_part->ttl_infos; + ctx->new_data_part->min_time_of_data_insert = ctx->future_part->parts.front()->getMinTimeOfDataInsertion(); + ctx->new_data_part->max_time_of_data_insert = ctx->future_part->parts.front()->getMaxTimeOfDataInsertion(); + /// It shouldn't be changed by mutation. ctx->new_data_part->index_granularity_info = ctx->source_part->index_granularity_info; diff --git a/src/Storages/System/StorageSystemParts.cpp b/src/Storages/System/StorageSystemParts.cpp index 1b800fd64a9..602dce7133f 100644 --- a/src/Storages/System/StorageSystemParts.cpp +++ b/src/Storages/System/StorageSystemParts.cpp @@ -62,6 +62,8 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_) {"secondary_indices_uncompressed_bytes", std::make_shared(), "Total size of uncompressed data for secondary indices in the data part. All the auxiliary files (for example, files with marks) are not included."}, {"secondary_indices_marks_bytes", std::make_shared(), "The size of the file with marks for secondary indices."}, {"modification_time", std::make_shared(), "The time the directory with the data part was modified. This usually corresponds to the time of data part creation."}, + {"min_time_of_data_insert", std::make_shared(), "min_time_of_data_insert."}, + {"max_time_of_data_insert", std::make_shared(), "max_time_of_data_insert."}, {"remove_time", std::make_shared(), "The time when the data part became inactive."}, {"refcount", std::make_shared(), "The number of places where the data part is used. A value greater than 2 indicates that the data part is used in queries or merges."}, {"min_date", std::make_shared(), "The minimum value of the date key in the data part."}, @@ -181,6 +183,11 @@ void StorageSystemParts::processNextStorage( if (columns_mask[src_index++]) columns[res_index++]->insert(static_cast(part->modification_time)); + if (columns_mask[src_index++]) + columns[res_index++]->insert(static_cast(part->getMinTimeOfDataInsertion())); + if (columns_mask[src_index++]) + columns[res_index++]->insert(static_cast(part->getMaxTimeOfDataInsertion())); + if (columns_mask[src_index++]) { time_t remove_time = part->remove_time.load(std::memory_order_relaxed); diff --git a/src/Storages/System/StorageSystemStoragePolicies.cpp b/src/Storages/System/StorageSystemStoragePolicies.cpp index 21251136f7d..9b4383f8e5d 100644 --- a/src/Storages/System/StorageSystemStoragePolicies.cpp +++ b/src/Storages/System/StorageSystemStoragePolicies.cpp @@ -44,6 +44,7 @@ StorageSystemStoragePolicies::StorageSystemStoragePolicies(const StorageID & tab {"volume_type", std::make_shared(getTypeEnumValues()), "The type of the volume - JBOD or a single disk."}, {"max_data_part_size", std::make_shared(), "the maximum size of a part that can be stored on any of the volumes disks."}, {"move_factor", std::make_shared(), "When the amount of available space gets lower than this factor, data automatically starts to move on the next volume if any (by default, 0.1)."}, + {"move_policy", std::make_shared(getTypeEnumValues())}, {"prefer_not_to_merge", std::make_shared(), "You should not use this setting. Disables merging of data parts on this volume (this is harmful and leads to performance degradation)."}, {"perform_ttl_move_on_insert", std::make_shared(), "Disables TTL move on data part INSERT. By default (if enabled) if we insert a data part that already expired by the TTL move rule it immediately goes to a volume/disk declared in move rule."}, {"load_balancing", std::make_shared(getTypeEnumValues()), "Policy for disk balancing, `round_robin` or `least_used`."} @@ -70,6 +71,7 @@ Pipe StorageSystemStoragePolicies::read( MutableColumnPtr col_volume_type = ColumnInt8::create(); MutableColumnPtr col_max_part_size = ColumnUInt64::create(); MutableColumnPtr col_move_factor = ColumnFloat32::create(); + MutableColumnPtr col_move_policy = ColumnInt8::create(); MutableColumnPtr col_prefer_not_to_merge = ColumnUInt8::create(); MutableColumnPtr col_perform_ttl_move_on_insert = ColumnUInt8::create(); MutableColumnPtr col_load_balancing = ColumnInt8::create(); @@ -90,6 +92,7 @@ Pipe StorageSystemStoragePolicies::read( col_volume_type->insert(static_cast(volumes[i]->getType())); col_max_part_size->insert(volumes[i]->max_data_part_size); col_move_factor->insert(policy_ptr->getMoveFactor()); + col_move_policy->insert(static_cast(policy_ptr->getMovePolicy())); col_prefer_not_to_merge->insert(volumes[i]->areMergesAvoided() ? 1 : 0); col_perform_ttl_move_on_insert->insert(volumes[i]->perform_ttl_move_on_insert); col_load_balancing->insert(static_cast(volumes[i]->load_balancing)); @@ -104,6 +107,7 @@ Pipe StorageSystemStoragePolicies::read( res_columns.emplace_back(std::move(col_volume_type)); res_columns.emplace_back(std::move(col_max_part_size)); res_columns.emplace_back(std::move(col_move_factor)); + res_columns.emplace_back(std::move(col_move_policy)); res_columns.emplace_back(std::move(col_prefer_not_to_merge)); res_columns.emplace_back(std::move(col_perform_ttl_move_on_insert)); res_columns.emplace_back(std::move(col_load_balancing)); diff --git a/tests/integration/test_insert_data_time/__init__.py b/tests/integration/test_insert_data_time/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_insert_data_time/configs/cluster.xml b/tests/integration/test_insert_data_time/configs/cluster.xml new file mode 100644 index 00000000000..84d16206080 --- /dev/null +++ b/tests/integration/test_insert_data_time/configs/cluster.xml @@ -0,0 +1,16 @@ + + + + + + node1 + 9000 + + + node2 + 9000 + + + + + diff --git a/tests/integration/test_insert_data_time/configs/macro.xml b/tests/integration/test_insert_data_time/configs/macro.xml new file mode 100644 index 00000000000..0e468d7b81b --- /dev/null +++ b/tests/integration/test_insert_data_time/configs/macro.xml @@ -0,0 +1,5 @@ + + + cluster + + \ No newline at end of file diff --git a/tests/integration/test_insert_data_time/test.py b/tests/integration/test_insert_data_time/test.py new file mode 100644 index 00000000000..86de6292828 --- /dev/null +++ b/tests/integration/test_insert_data_time/test.py @@ -0,0 +1,200 @@ +import pytest +import logging +import time + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +node1 = cluster.add_instance( + "node1", + with_zookeeper=True, + main_configs=["configs/cluster.xml", "configs/macro.xml"], + macros={"replica": "node1"}, +) +node2 = cluster.add_instance( + "node2", + with_zookeeper=True, + main_configs=["configs/cluster.xml", "configs/macro.xml"], + macros={"replica": "node2"}, +) + +node_old = cluster.add_instance( + "node_with_old_ch", + image="clickhouse/clickhouse-server", + tag="24.3", + with_installed_binary=True, + stay_alive=True, +) + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + + finally: + cluster.shutdown() + + +def get_max_min_time_of_data_insert(node, db_name, table_name): + return ( + node.query( + f"SELECT min(min_time_of_data_insert), max(max_time_of_data_insert) FROM system.parts WHERE database='{db_name}' AND table='{table_name}' AND active=1" + ) + .strip() + .split("\t") + ) + + +def test_merge(started_cluster): + db_name = "test_db" + table_name = "test_table" + node = node1 + + node.query(f"DROP DATABASE IF EXISTS {db_name}") + node.query(f"CREATE DATABASE {db_name}") + node.query( + f"CREATE TABLE {db_name}.{table_name} (a int) ENGINE = MergeTree() ORDER BY a" + ) + node.query(f"INSERT INTO {db_name}.{table_name} SELECT 1") + time.sleep(1) + node.query(f"INSERT INTO {db_name}.{table_name} SELECT 2") + [min_time, max_time] = get_max_min_time_of_data_insert(node, db_name, table_name) + + print(min_time, max_time) + assert min_time != max_time + + node.query(f"OPTIMIZE TABLE {db_name}.{table_name}") + [min_time_new, max_time_new] = get_max_min_time_of_data_insert( + node, db_name, table_name + ) + + assert min_time_new == min_time and max_time_new == max_time + + +def test_mutations(started_cluster): + db_name = "test_db" + table_name = "test_table" + node = node1 + + node.query(f"DROP DATABASE IF EXISTS {db_name}") + node.query(f"CREATE DATABASE {db_name}") + node.query( + f"CREATE TABLE {db_name}.{table_name} (a int, b int) ENGINE = MergeTree() ORDER BY a" + ) + node.query(f"INSERT INTO {db_name}.{table_name} SELECT 1, 1") + [min_time, max_time] = get_max_min_time_of_data_insert(node, db_name, table_name) + print(min_time, max_time) + assert min_time == max_time + + time.sleep(1) + node.query(f"ALTER TABLE {db_name}.{table_name} UPDATE b = 2 WHERE b = 1") + [min_time_new, max_time_new] = get_max_min_time_of_data_insert( + node, db_name, table_name + ) + + assert min_time == min_time_new and max_time == max_time_new + + +def test_move_partition(started_cluster): + db_name = "test_db" + table_name1 = "test_table1" + table_name2 = "test_table2" + node = node1 + + node.query(f"DROP DATABASE IF EXISTS {db_name}") + node.query(f"CREATE DATABASE {db_name}") + node.query( + f"CREATE TABLE {db_name}.{table_name1} (a int, b int) ENGINE = MergeTree() ORDER BY a PARTITION BY a" + ) + node.query( + f"CREATE TABLE {db_name}.{table_name2} (a int, b int) ENGINE = MergeTree() ORDER BY a PARTITION BY a" + ) + node.query(f"INSERT INTO {db_name}.{table_name1} SELECT 1, 1") + [min_time, max_time] = get_max_min_time_of_data_insert(node, db_name, table_name1) + + partition_name = ( + node.query( + f"SELECT partition FROM system.parts where database='{db_name}' AND table='{table_name1}' AND active=1" + ) + .strip() + .split("\t") + )[0] + assert min_time == max_time + + time.sleep(1) + node.query( + f"ALTER TABLE {db_name}.{table_name1} MOVE PARTITION '{partition_name}' TO TABLE {db_name}.{table_name2}" + ) + [min_time_new, max_time_new] = get_max_min_time_of_data_insert( + node, db_name, table_name2 + ) + + assert min_time == min_time_new and max_time == max_time_new + + +def test_replicated_fetch(started_cluster): + db_name = "test_db" + table_name = "test_table" + + node1.query(f"DROP DATABASE IF EXISTS {db_name} ON CLUSTER '{{cluster}}'") + node1.query(f"CREATE DATABASE {db_name} ON CLUSTER '{{cluster}}'") + node1.query( + f"CREATE TABLE {db_name}.{table_name} ON CLUSTER '{{cluster}}' (a int) ENGINE = ReplicatedMergeTree('/clickhouse/tables/test_table/replicated', '{{replica}}') ORDER BY a" + ) + node1.query(f"INSERT INTO {db_name}.{table_name} SELECT 1") + + [min_time_node1, max_time_node1] = get_max_min_time_of_data_insert( + node1, db_name, table_name + ) + [min_time_node2, max_time_node2] = get_max_min_time_of_data_insert( + node2, db_name, table_name + ) + assert min_time_node1 == min_time_node2 and max_time_node1 == max_time_node2 + + node2.query(f"INSERT INTO {db_name}.{table_name} SELECT 2") + node2.query(f"OPTIMIZE TABLE {db_name}.{table_name}") + + [min_time_node1, max_time_node1] = get_max_min_time_of_data_insert( + node1, db_name, table_name + ) + [min_time_node2, max_time_node2] = get_max_min_time_of_data_insert( + node2, db_name, table_name + ) + assert min_time_node1 == min_time_node2 and max_time_node1 == max_time_node2 + + +def test_version_compatibility(started_cluster): + db_name = "test_db" + table_name = "test_table" + node = node_old + + node.query(f"DROP DATABASE IF EXISTS {db_name}") + node.query(f"CREATE DATABASE {db_name}") + node.query( + f"CREATE TABLE {db_name}.{table_name} (a int) ENGINE = MergeTree() ORDER BY a" + ) + node.query(f"INSERT INTO {db_name}.{table_name} SELECT 1") + + modification_time = ( + node.query( + f"SELECT modification_time FROM system.parts WHERE database='{db_name}' AND table='{table_name}' AND active=1" + ) + .strip() + .split("\t") + )[0] + + node.restart_with_latest_version() + + # For old parts modification time will be equal modification time. + [min_time_node, max_time_node] = get_max_min_time_of_data_insert( + node, db_name, table_name + ) + assert min_time_node == modification_time and max_time_node == modification_time + + node.query(f"INSERT INTO {db_name}.{table_name} SELECT 2") + node.restart_with_original_version() + assert node.query(f"SELECT count() FROM {db_name}.{table_name}") == "2\n" diff --git a/tests/integration/test_move_policy_jbod/__init__.py b/tests/integration/test_move_policy_jbod/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_move_policy_jbod/configs/config.d/storage_configuration.xml b/tests/integration/test_move_policy_jbod/configs/config.d/storage_configuration.xml new file mode 100644 index 00000000000..f2814e1ce32 --- /dev/null +++ b/tests/integration/test_move_policy_jbod/configs/config.d/storage_configuration.xml @@ -0,0 +1,57 @@ + + + + + + + + /hot/ + + + /warm/ + + + /cold/ + + + + + + + + hot + + + warm + + + cold + + + 0.5 + by_part_size + + + + + + hot + + + warm + + + cold + + + 0.5 + by_insert_data_time + + + + +1 +0.1 +0.1 +0.1 + diff --git a/tests/integration/test_move_policy_jbod/configs/remote_servers.xml b/tests/integration/test_move_policy_jbod/configs/remote_servers.xml new file mode 100644 index 00000000000..ea4769f55e1 --- /dev/null +++ b/tests/integration/test_move_policy_jbod/configs/remote_servers.xml @@ -0,0 +1,17 @@ + + + + + true + + node1 + 9000 + + + node2 + 9000 + + + + + diff --git a/tests/integration/test_move_policy_jbod/test.py b/tests/integration/test_move_policy_jbod/test.py new file mode 100644 index 00000000000..5d549f0d1f3 --- /dev/null +++ b/tests/integration/test_move_policy_jbod/test.py @@ -0,0 +1,150 @@ +import random +import time +from multiprocessing.dummy import Pool +import datetime + +import pytest +from helpers.client import QueryRuntimeException +from helpers.cluster import ClickHouseCluster + +hot_volume_size_mb = 5 +warm_volume_size_mb = 10 +cold_volume_size_mb = 15 +mb_in_bytes = 1024 * 1024 + +node_options = dict( + with_zookeeper=True, + main_configs=[ + "configs/remote_servers.xml", + "configs/config.d/storage_configuration.xml", + ], + tmpfs=[ + f"/hot:size={hot_volume_size_mb}M", + f"/warm:size={warm_volume_size_mb}M", + f"/cold:size={cold_volume_size_mb}M", + ], +) + +cluster = ClickHouseCluster(__file__) +node1 = cluster.add_instance("node1", macros={"shard": 0, "replica": 1}, **node_options) +node2 = cluster.add_instance("node2", macros={"shard": 0, "replica": 2}, **node_options) +nodes = [node1, node2] + + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + yield cluster + except Exception as ex: + print(ex) + finally: + cluster.shutdown() + + +def wait_until_moves_finished(node, requred_part_count, disk): + retry_count = 20 + sleep_time = 1 + for _ in range(retry_count): + try: + parts_on_disk = int( + node.query(f"SELECT count() FROM system.parts WHERE disk_name='{disk}'") + ) + if parts_on_disk <= requred_part_count: + return True + except Exception: + pass + time.sleep(sleep_time) + return False + + +def check_by_insert_time_parts_disks(node, database): + res = node.query( + f"SELECT disk_name, toUnixTimestamp(min(min_time_of_data_insert)) AS min_time, toUnixTimestamp(max(min_time_of_data_insert)) AS max_time FROM system.parts WHERE database ='{database}' GROUP BY disk_name" + ) + + times_of_parts = {} + for line in res.splitlines(): + [disk_name, min_time, max_time] = line.split("\t") + times_of_parts[disk_name] = (int(min_time), int(max_time)) + + # min_time at i disks must be >= max_time at j disk. Where i > j. + assert ( + times_of_parts["cold"][0] <= times_of_parts["hot"][1] + and times_of_parts["cold"][0] <= times_of_parts["warm"][1] + ) + assert times_of_parts["warm"][0] <= times_of_parts["hot"][1] + + +@pytest.mark.parametrize( + "storage_policy,additional_check", + [ + ("jbod_by_size_policy", None), + ("jbod_time_policy", check_by_insert_time_parts_disks), + ], +) +def test_simple_moves(started_cluster, storage_policy, additional_check): + node = node1 + + node.query("DROP DATABASE IF EXISTS test_db SYNC;") + node.query("CREATE DATABASE test_db;") + + node.query( + f"CREATE TABLE test_db.table (a Int, b String) ENGINE=MergeTree() ORDER BY a SETTINGS storage_policy='{storage_policy}'" + ) + + node.query(f"SYSTEM STOP MERGES test_db.table;") + + for _ in range(15): + node.query( + f"INSERT INTO test_db.table SELECT rand()%10, randomString({mb_in_bytes});" + ) + time_last_data_insert = int(time.time()) + assert wait_until_moves_finished(node, hot_volume_size_mb // 2, "hot") + assert wait_until_moves_finished(node, warm_volume_size_mb // 2, "warm") + # Make sure that times of data inserts are unique + if int(time.time()) == time_last_data_insert: + time.sleep(1) + + if additional_check: + additional_check(node, "test_db") + node.query(f"DROP DATABASE test_db SYNC;") + + +@pytest.mark.parametrize( + "storage_policy,additional_check", + [ + ("jbod_by_size_policy", None), + ("jbod_time_policy", check_by_insert_time_parts_disks), + ], +) +def test_moves_replicated(started_cluster, storage_policy, additional_check): + node1.query("DROP DATABASE IF EXISTS test_db ON CLUSTER 'test_cluster' SYNC;") + node1.query("CREATE DATABASE test_db ON CLUSTER 'test_cluster';") + # Here we need to block merges the execution and scheduling, otherwise parts will be in the `virtual` state + # and moves of theese parts will be blocked, until merge is completed. + node1.query( + f""" + CREATE TABLE test_db.table ON CLUSTER 'test_cluster' (a Int, b String) ENGINE=ReplicatedMergeTree('/clickhouse/tables/{{uuid}}', '{{replica}}') ORDER BY a + SETTINGS storage_policy='{storage_policy}', max_replicated_merges_in_queue=0; + """ + ) + node1.query(f"SYSTEM STOP MERGES ON CLUSTER 'test_cluster' test_db.table; ") + + for _ in range(15): + node1.query( + f"INSERT INTO test_db.table SELECT rand()%10, randomString({mb_in_bytes});" + ) + time_last_data_insert = int(time.time()) + assert wait_until_moves_finished(node1, hot_volume_size_mb // 2, "hot") + assert wait_until_moves_finished(node2, hot_volume_size_mb // 2, "hot") + assert wait_until_moves_finished(node1, warm_volume_size_mb // 2, "warm") + assert wait_until_moves_finished(node2, warm_volume_size_mb // 2, "warm") + if time_last_data_insert == int(time.time()): + time.sleep(1) + + if additional_check: + additional_check(node1, "test_db") + additional_check(node2, "test_db") + node1.query(f"DROP DATABASE test_db ON CLUSTER 'test_cluster' SYNC;") diff --git a/tests/integration/test_multiple_disks/test.py b/tests/integration/test_multiple_disks/test.py index e97ffeb4cc3..b7da9f45c08 100644 --- a/tests/integration/test_multiple_disks/test.py +++ b/tests/integration/test_multiple_disks/test.py @@ -101,6 +101,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -113,6 +114,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -125,6 +127,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -137,6 +140,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 1, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -149,6 +153,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -161,6 +166,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -173,6 +179,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "10485760", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -185,6 +192,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -197,6 +205,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.7, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -209,6 +218,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.7, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -221,6 +231,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "2097152", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -233,6 +244,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "20971520", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -245,6 +257,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -257,6 +270,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "0", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -269,6 +283,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "1024", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", @@ -281,6 +296,7 @@ def test_system_tables(start_cluster): "volume_type": "JBOD", "max_data_part_size": "1024000000", "move_factor": 0.1, + "move_policy": "BY_PART_SIZE", "prefer_not_to_merge": 0, "perform_ttl_move_on_insert": 1, "load_balancing": "ROUND_ROBIN", From 690f6143ed23d3da59041b63fdd8323cc5022839 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Mon, 9 Sep 2024 08:13:38 +0000 Subject: [PATCH 2/3] Add doc and fix fasttests --- .../engines/table-engines/mergetree-family/mergetree.md | 4 +++- docs/en/operations/system-tables/parts.md | 6 ++++++ docs/en/operations/system-tables/storage_policies.md | 3 +++ src/Storages/MergeTree/MergeTreeData.cpp | 3 --- src/Storages/System/StorageSystemParts.cpp | 4 ++-- src/Storages/System/StorageSystemStoragePolicies.cpp | 2 +- ...0961_checksums_in_system_parts_columns_table.reference | 2 +- .../0_stateless/02117_show_create_table_system.reference | 3 +++ .../0_stateless/02253_empty_part_checksums.reference | 2 +- tests/queries/0_stateless/02361_fsync_profile_events.sh | 2 +- .../02381_compress_marks_and_primary_key.reference | 8 ++++---- 11 files changed, 25 insertions(+), 14 deletions(-) diff --git a/docs/en/engines/table-engines/mergetree-family/mergetree.md b/docs/en/engines/table-engines/mergetree-family/mergetree.md index 0b693775dde..d65783b21c4 100644 --- a/docs/en/engines/table-engines/mergetree-family/mergetree.md +++ b/docs/en/engines/table-engines/mergetree-family/mergetree.md @@ -777,6 +777,7 @@ Storage policies configuration markup: 0.2 + by_part_size @@ -794,7 +795,8 @@ Tags: - `volume_name_N` — Volume name. Volume names must be unique. - `disk` — a disk within a volume. - `max_data_part_size_bytes` — the maximum size of a part that can be stored on any of the volume’s disks. If the a size of a merged part estimated to be bigger than `max_data_part_size_bytes` then this part will be written to a next volume. Basically this feature allows to keep new/small parts on a hot (SSD) volume and move them to a cold (HDD) volume when they reach large size. Do not use this setting if your policy has only one volume. -- `move_factor` — when the amount of available space gets lower than this factor, data automatically starts to move on the next volume if any (by default, 0.1). ClickHouse sorts existing parts by size from largest to smallest (in descending order) and selects parts with the total size that is sufficient to meet the `move_factor` condition. If the total size of all parts is insufficient, all parts will be moved. +- `move_factor` — when the amount of available space gets lower than this factor, data automatically starts to move on the next volume if any (by default, 0.1). If the total size of all parts is insufficient, all parts will be moved. +- `move_policy` - Policy for selecting parts for move to the next volume with the total size that is sufficient to meet the `move_factor` condition. `by_part_size` or `by_insert_data_time`. - `perform_ttl_move_on_insert` — Disables TTL move on data part INSERT. By default (if enabled) if we insert a data part that already expired by the TTL move rule it immediately goes to a volume/disk declared in move rule. This can significantly slowdown insert in case if destination volume/disk is slow (e.g. S3). If disabled then already expired data part is written into a default volume and then right after moved to TTL volume. - `load_balancing` - Policy for disk balancing, `round_robin` or `least_used`. - `least_used_ttl_ms` - Configure timeout (in milliseconds) for the updating available space on all disks (`0` - update always, `-1` - never update, default is `60000`). Note, if the disk can be used by ClickHouse only and is not subject to a online filesystem resize/shrink you can use `-1`, in all other cases it is not recommended, since eventually it will lead to incorrect space distribution. diff --git a/docs/en/operations/system-tables/parts.md b/docs/en/operations/system-tables/parts.md index 8113b850a38..a31f96d439d 100644 --- a/docs/en/operations/system-tables/parts.md +++ b/docs/en/operations/system-tables/parts.md @@ -51,6 +51,10 @@ Columns: - `modification_time` ([DateTime](../../sql-reference/data-types/datetime.md)) – The time the directory with the data part was modified. This usually corresponds to the time of data part creation. +- `min_time_of_data_insert` ([DateTime](../../sql-reference/data-types/datetime.md)) – The minimum time of when data was inserted into this part. + +- `max_time_of_data_insert` ([DateTime](../../sql-reference/data-types/datetime.md)) – The maximum time of when data was inserted into this part. + - `remove_time` ([DateTime](../../sql-reference/data-types/datetime.md)) – The time when the data part became inactive. - `refcount` ([UInt32](../../sql-reference/data-types/int-uint.md)) – The number of places where the data part is used. A value greater than 2 indicates that the data part is used in queries or merges. @@ -136,6 +140,8 @@ secondary_indices_uncompressed_bytes: 6 secondary_indices_marks_bytes: 48 marks_bytes: 144 modification_time: 2020-06-18 13:01:49 +min_time_of_data_insert: 2020-06-18 13:01:49 +max_time_of_data_insert: 2020-06-18 13:01:49 remove_time: 1970-01-01 00:00:00 refcount: 1 min_date: 1970-01-01 diff --git a/docs/en/operations/system-tables/storage_policies.md b/docs/en/operations/system-tables/storage_policies.md index 24271a943a4..a0df664b826 100644 --- a/docs/en/operations/system-tables/storage_policies.md +++ b/docs/en/operations/system-tables/storage_policies.md @@ -17,6 +17,9 @@ Columns: - `UNKNOWN` - `max_data_part_size` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Maximum size of a data part that can be stored on volume disks (0 — no limit). - `move_factor` ([Float64](../../sql-reference/data-types/float.md)) — Ratio of free disk space. When the ratio exceeds the value of configuration parameter, ClickHouse start to move data to the next volume in order. +- `move_policy`([Enum8](../../sql-reference/data-types/enum.md)) - Policy for selecting parts for move to the next volume with the total size that is sufficient to meet the `move_factor` condition. Can have one of the following values: + - `BY_PART_SIZE` - sorts existing parts by size from largest to smallest (in descending order). + - `BY_INSERT_DATA_TIME` - sorts existing parts by time of data insert in this part(older parts first). - `prefer_not_to_merge` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Value of the `prefer_not_to_merge` setting. Should be always false. When this setting is enabled, you did a mistake. - `perform_ttl_move_on_insert` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Value of the `perform_ttl_move_on_insert` setting. — Disables TTL move on data part INSERT. By default if we insert a data part that already expired by the TTL move rule it immediately goes to a volume/disk declared in move rule. This can significantly slowdown insert in case if destination volume/disk is slow (e.g. S3). - `load_balancing` ([Enum8](../../sql-reference/data-types/enum.md)) — Policy for disk balancing. Can have one of the following values: diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 3cae580768b..de670731d21 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -7431,9 +7431,6 @@ std::pair MergeTreeData::cloneAn dst_data_part->loadColumnsChecksumsIndexes(require_part_metadata, true); dst_data_part->modification_time = dst_part_storage->getLastModified().epochTime(); - - // dst_data_part->min_time_of_data_insert = src_part->getMinTimeOfDataInsertion(); - // dst_data_part->max_time_of_data_insert = src_part->getMaxTimeOfDataInsertion(); return std::make_pair(dst_data_part, std::move(temporary_directory_lock)); } diff --git a/src/Storages/System/StorageSystemParts.cpp b/src/Storages/System/StorageSystemParts.cpp index 602dce7133f..5355251619a 100644 --- a/src/Storages/System/StorageSystemParts.cpp +++ b/src/Storages/System/StorageSystemParts.cpp @@ -62,8 +62,8 @@ StorageSystemParts::StorageSystemParts(const StorageID & table_id_) {"secondary_indices_uncompressed_bytes", std::make_shared(), "Total size of uncompressed data for secondary indices in the data part. All the auxiliary files (for example, files with marks) are not included."}, {"secondary_indices_marks_bytes", std::make_shared(), "The size of the file with marks for secondary indices."}, {"modification_time", std::make_shared(), "The time the directory with the data part was modified. This usually corresponds to the time of data part creation."}, - {"min_time_of_data_insert", std::make_shared(), "min_time_of_data_insert."}, - {"max_time_of_data_insert", std::make_shared(), "max_time_of_data_insert."}, + {"min_time_of_data_insert", std::make_shared(), "The minimum time of when data was inserted into this part."}, + {"max_time_of_data_insert", std::make_shared(), "The maximum time of when data was inserted into this part."}, {"remove_time", std::make_shared(), "The time when the data part became inactive."}, {"refcount", std::make_shared(), "The number of places where the data part is used. A value greater than 2 indicates that the data part is used in queries or merges."}, {"min_date", std::make_shared(), "The minimum value of the date key in the data part."}, diff --git a/src/Storages/System/StorageSystemStoragePolicies.cpp b/src/Storages/System/StorageSystemStoragePolicies.cpp index 9b4383f8e5d..58aa347f417 100644 --- a/src/Storages/System/StorageSystemStoragePolicies.cpp +++ b/src/Storages/System/StorageSystemStoragePolicies.cpp @@ -44,7 +44,7 @@ StorageSystemStoragePolicies::StorageSystemStoragePolicies(const StorageID & tab {"volume_type", std::make_shared(getTypeEnumValues()), "The type of the volume - JBOD or a single disk."}, {"max_data_part_size", std::make_shared(), "the maximum size of a part that can be stored on any of the volumes disks."}, {"move_factor", std::make_shared(), "When the amount of available space gets lower than this factor, data automatically starts to move on the next volume if any (by default, 0.1)."}, - {"move_policy", std::make_shared(getTypeEnumValues())}, + {"move_policy", std::make_shared(getTypeEnumValues()), "Policy for selecting parts for move to the next volume, `by_part_size` or `by_insert_data_time`."}, {"prefer_not_to_merge", std::make_shared(), "You should not use this setting. Disables merging of data parts on this volume (this is harmful and leads to performance degradation)."}, {"perform_ttl_move_on_insert", std::make_shared(), "Disables TTL move on data part INSERT. By default (if enabled) if we insert a data part that already expired by the TTL move rule it immediately goes to a volume/disk declared in move rule."}, {"load_balancing", std::make_shared(getTypeEnumValues()), "Policy for disk balancing, `round_robin` or `least_used`."} diff --git a/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference b/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference index 4bf3cfe65a2..4a896a46fed 100644 --- a/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference +++ b/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference @@ -1 +1 @@ -20000101_1_1_0 test_00961 e4ed027389c208d2b5fce9c4ef1ca42c 4c23d7f5920f89aefc3b062b646cd23d 908ddf2b1d0af239da96ff1e527a8a1f +20000101_1_1_0 test_00961 812036551f93a3685116fa6169d36fa9 2ca3d19eefed0ce7b4e7627e443b9a59 908ddf2b1d0af239da96ff1e527a8a1f diff --git a/tests/queries/0_stateless/02117_show_create_table_system.reference b/tests/queries/0_stateless/02117_show_create_table_system.reference index 638a46a142f..937b79d42a6 100644 --- a/tests/queries/0_stateless/02117_show_create_table_system.reference +++ b/tests/queries/0_stateless/02117_show_create_table_system.reference @@ -471,6 +471,8 @@ CREATE TABLE system.parts `secondary_indices_uncompressed_bytes` UInt64, `secondary_indices_marks_bytes` UInt64, `modification_time` DateTime, + `min_time_of_data_insert` DateTime, + `max_time_of_data_insert` DateTime, `remove_time` DateTime, `refcount` UInt32, `min_date` Date, @@ -1056,6 +1058,7 @@ CREATE TABLE system.storage_policies `volume_type` Enum8('JBOD' = 0, 'SINGLE_DISK' = 1, 'UNKNOWN' = 2), `max_data_part_size` UInt64, `move_factor` Float32, + `move_policy` Enum8('BY_PART_SIZE' = 0, 'BY_INSERT_DATA_TIME' = 1), `prefer_not_to_merge` UInt8, `perform_ttl_move_on_insert` UInt8, `load_balancing` Enum8('ROUND_ROBIN' = 0, 'LEAST_USED' = 1) diff --git a/tests/queries/0_stateless/02253_empty_part_checksums.reference b/tests/queries/0_stateless/02253_empty_part_checksums.reference index 65a8c9ee65e..0f2374adab9 100644 --- a/tests/queries/0_stateless/02253_empty_part_checksums.reference +++ b/tests/queries/0_stateless/02253_empty_part_checksums.reference @@ -5,4 +5,4 @@ 0 1 0 -0_0_0_0 Wide 370db59d5dcaef5d762b11d319c368c7 514a8be2dac94fd039dbd230065e58a4 b324ada5cd6bb14402c1e59200bd003a +0_0_0_0 Wide 3f2fc7b294c1676dcdc3a7c830228aa9 645e5774e2921598cd105516a04c6a8b b324ada5cd6bb14402c1e59200bd003a diff --git a/tests/queries/0_stateless/02361_fsync_profile_events.sh b/tests/queries/0_stateless/02361_fsync_profile_events.sh index 73bf3fa120a..8d8c69e32a5 100755 --- a/tests/queries/0_stateless/02361_fsync_profile_events.sh +++ b/tests/queries/0_stateless/02361_fsync_profile_events.sh @@ -46,7 +46,7 @@ for i in {1..100}; do # Non retriable errors if [[ $FileSync -ne 8 ]]; then - echo "FileSync: $FileSync != 8" >&2 + echo "FileSync: $FileSync != 9" >&2 exit 2 fi # Check that all files was synced diff --git a/tests/queries/0_stateless/02381_compress_marks_and_primary_key.reference b/tests/queries/0_stateless/02381_compress_marks_and_primary_key.reference index 53bddb77b84..ed7af3b2f4f 100644 --- a/tests/queries/0_stateless/02381_compress_marks_and_primary_key.reference +++ b/tests/queries/0_stateless/02381_compress_marks_and_primary_key.reference @@ -1,13 +1,13 @@ 1000 10000 1000 10000 -test_02381 2000000 16112790 11904 16100886 -test_02381_compress 2000000 16099626 1658 16097968 +test_02381 2000000 16112832 11904 16100928 +test_02381_compress 2000000 16099668 1658 16098010 10000 100000 10000 100000 10000 100000 10000 100000 -test_02381 4000000 28098334 2946 28095388 -test_02381_compress 4000000 28125412 23616 28101796 +test_02381 4000000 28098376 2946 28095430 +test_02381_compress 4000000 28125454 23616 28101838 1 Hello 2 World 1 Hello From 9ec59cff3c64a0a8661003dfa8e31d8f90c32ae1 Mon Sep 17 00:00:00 2001 From: MikhailBurdukov Date: Mon, 9 Sep 2024 10:08:54 +0000 Subject: [PATCH 3/3] Disable checksums for new file in parts --- src/Storages/MergeTree/DataPartsExchange.cpp | 3 ++- src/Storages/MergeTree/IMergeTreeDataPart.cpp | 3 +++ .../MergeTree/MergedBlockOutputStream.cpp | 24 ++++++++----------- ...ms_in_system_parts_columns_table.reference | 2 +- .../02253_empty_part_checksums.reference | 2 +- .../0_stateless/02361_fsync_profile_events.sh | 2 +- ...1_compress_marks_and_primary_key.reference | 8 +++---- 7 files changed, 22 insertions(+), 22 deletions(-) diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 061ee356203..a9990d38652 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -777,7 +777,8 @@ void Fetcher::downloadBaseOrProjectionPartToDisk( if (file_name != "checksums.txt" && file_name != "columns.txt" && file_name != IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME && - file_name != IMergeTreeDataPart::METADATA_VERSION_FILE_NAME) + file_name != IMergeTreeDataPart::METADATA_VERSION_FILE_NAME && + file_name != IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE) checksums.addFile(file_name, file_size, expected_hash); } diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index c30d80f2379..7fb18ce1ba7 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -1015,6 +1015,9 @@ NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const if (getDataPartStorage().exists(METADATA_VERSION_FILE_NAME)) result.emplace(METADATA_VERSION_FILE_NAME); + if (getDataPartStorage().exists(MIN_MAX_TIME_OF_DATA_INSERT_FILE)) + result.emplace(MIN_MAX_TIME_OF_DATA_INSERT_FILE); + return result; } diff --git a/src/Storages/MergeTree/MergedBlockOutputStream.cpp b/src/Storages/MergeTree/MergedBlockOutputStream.cpp index 3a00c5cf431..1e16ffb7fb1 100644 --- a/src/Storages/MergeTree/MergedBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergedBlockOutputStream.cpp @@ -315,20 +315,6 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis written_files.emplace_back(std::move(out)); } - { - auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE, 4096, write_settings); - HashingWriteBuffer out_hashing(*out); - DB::writeIntText(*new_part->min_time_of_data_insert, out_hashing); - DB::writeText(" ", out_hashing); - DB::writeIntText(*new_part->max_time_of_data_insert, out_hashing); - out_hashing.finalize(); - checksums.files[IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE].file_size = out_hashing.count(); - checksums.files[IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE].file_hash = out_hashing.getHash(); - - out->preFinalize(); - written_files.emplace_back(std::move(out)); - } - { /// Write a file with a description of columns. auto out = new_part->getDataPartStorage().writeFile("columns.txt", 4096, write_settings); @@ -345,6 +331,16 @@ MergedBlockOutputStream::WrittenFiles MergedBlockOutputStream::finalizePartOnDis written_files.emplace_back(std::move(out)); } + { + auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::MIN_MAX_TIME_OF_DATA_INSERT_FILE, 4096, write_settings); + DB::writeIntText(*new_part->min_time_of_data_insert, *out); + DB::writeText(" ", *out); + DB::writeIntText(*new_part->max_time_of_data_insert, *out); + + out->preFinalize(); + written_files.emplace_back(std::move(out)); + } + if (default_codec != nullptr) { auto out = new_part->getDataPartStorage().writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, write_settings); diff --git a/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference b/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference index 4a896a46fed..4bf3cfe65a2 100644 --- a/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference +++ b/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference @@ -1 +1 @@ -20000101_1_1_0 test_00961 812036551f93a3685116fa6169d36fa9 2ca3d19eefed0ce7b4e7627e443b9a59 908ddf2b1d0af239da96ff1e527a8a1f +20000101_1_1_0 test_00961 e4ed027389c208d2b5fce9c4ef1ca42c 4c23d7f5920f89aefc3b062b646cd23d 908ddf2b1d0af239da96ff1e527a8a1f diff --git a/tests/queries/0_stateless/02253_empty_part_checksums.reference b/tests/queries/0_stateless/02253_empty_part_checksums.reference index 0f2374adab9..65a8c9ee65e 100644 --- a/tests/queries/0_stateless/02253_empty_part_checksums.reference +++ b/tests/queries/0_stateless/02253_empty_part_checksums.reference @@ -5,4 +5,4 @@ 0 1 0 -0_0_0_0 Wide 3f2fc7b294c1676dcdc3a7c830228aa9 645e5774e2921598cd105516a04c6a8b b324ada5cd6bb14402c1e59200bd003a +0_0_0_0 Wide 370db59d5dcaef5d762b11d319c368c7 514a8be2dac94fd039dbd230065e58a4 b324ada5cd6bb14402c1e59200bd003a diff --git a/tests/queries/0_stateless/02361_fsync_profile_events.sh b/tests/queries/0_stateless/02361_fsync_profile_events.sh index 8d8c69e32a5..d89f08da168 100755 --- a/tests/queries/0_stateless/02361_fsync_profile_events.sh +++ b/tests/queries/0_stateless/02361_fsync_profile_events.sh @@ -45,7 +45,7 @@ for i in {1..100}; do ")" # Non retriable errors - if [[ $FileSync -ne 8 ]]; then + if [[ $FileSync -ne 9 ]]; then echo "FileSync: $FileSync != 9" >&2 exit 2 fi diff --git a/tests/queries/0_stateless/02381_compress_marks_and_primary_key.reference b/tests/queries/0_stateless/02381_compress_marks_and_primary_key.reference index ed7af3b2f4f..53bddb77b84 100644 --- a/tests/queries/0_stateless/02381_compress_marks_and_primary_key.reference +++ b/tests/queries/0_stateless/02381_compress_marks_and_primary_key.reference @@ -1,13 +1,13 @@ 1000 10000 1000 10000 -test_02381 2000000 16112832 11904 16100928 -test_02381_compress 2000000 16099668 1658 16098010 +test_02381 2000000 16112790 11904 16100886 +test_02381_compress 2000000 16099626 1658 16097968 10000 100000 10000 100000 10000 100000 10000 100000 -test_02381 4000000 28098376 2946 28095430 -test_02381_compress 4000000 28125454 23616 28101838 +test_02381 4000000 28098334 2946 28095388 +test_02381_compress 4000000 28125412 23616 28101796 1 Hello 2 World 1 Hello