diff --git a/src/Common/FileCacheSettings.cpp b/src/Common/FileCacheSettings.cpp index 02009d95550..f555de277b2 100644 --- a/src/Common/FileCacheSettings.cpp +++ b/src/Common/FileCacheSettings.cpp @@ -10,7 +10,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration & max_size = config.getUInt64(config_prefix + ".data_cache_max_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_CACHE_SIZE); max_elements = config.getUInt64(config_prefix + ".data_cache_max_elements", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS); max_file_segment_size = config.getUInt64(config_prefix + ".max_file_segment_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE); - cache_on_insert = config.getUInt64(config_prefix + ".cache_on_insert", false); + cache_on_write_operations = config.getUInt64(config_prefix + ".cache_on_write_operations", false); } } diff --git a/src/Common/FileCacheSettings.h b/src/Common/FileCacheSettings.h index c7956e48282..53c28400c86 100644 --- a/src/Common/FileCacheSettings.h +++ b/src/Common/FileCacheSettings.h @@ -12,7 +12,7 @@ struct FileCacheSettings size_t max_size = 0; size_t max_elements = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS; size_t max_file_segment_size = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE; - bool cache_on_insert = false; + bool cache_on_write_operations = false; void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix); }; diff --git a/src/Core/Settings.h b/src/Core/Settings.h index f8a0ea3c7e7..6d275ad6790 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -557,7 +557,7 @@ class IColumn; M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \ M(Bool, remote_fs_enable_cache, true, "Use cache for remote filesystem. This setting does not turn on/off cache for disks (must me done via disk config), but allows to bypass cache for some queries if intended", 0) \ M(UInt64, remote_fs_cache_max_wait_sec, 5, "Allow to wait at most this number of seconds for download of current remote_fs_buffer_size bytes, and skip cache if exceeded", 0) \ - M(Bool, remote_fs_cache_on_insert, false, "Write into cache on INSERT query", 0) \ + M(Bool, remote_fs_cache_on_write_operations, true, "Write into cache on INSERT query To actually work this setting requires be added to disk config too", 0) \ \ M(UInt64, http_max_tries, 10, "Max attempts to read via http.", 0) \ M(UInt64, http_retry_initial_backoff_ms, 100, "Min milliseconds for backoff, when retrying read via http", 0) \ diff --git a/src/Disks/DiskCacheWrapper.cpp b/src/Disks/DiskCacheWrapper.cpp index de5397a87c2..568fbf160c0 100644 --- a/src/Disks/DiskCacheWrapper.cpp +++ b/src/Disks/DiskCacheWrapper.cpp @@ -169,7 +169,7 @@ DiskCacheWrapper::readFile( auto src_buffer = DiskDecorator::readFile(path, current_read_settings, read_hint, file_size); WriteSettings write_settings; - write_settings.remote_fs_cache_on_insert = false; + write_settings.remote_fs_cache_on_write_operations = false; auto dst_buffer = cache_disk->writeFile(tmp_path, settings.local_fs_buffer_size, WriteMode::Rewrite, write_settings); copyData(*src_buffer, *dst_buffer); @@ -206,7 +206,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode return DiskDecorator::writeFile(path, buf_size, mode, settings); WriteSettings current_settings = settings; - current_settings.remote_fs_cache_on_insert = false; + current_settings.remote_fs_cache_on_write_operations = false; LOG_TEST(log, "Write file {} to cache", backQuote(path)); diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index a9e58efbfb9..292699b5e22 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -292,8 +292,8 @@ std::unique_ptr DiskS3::writeFile(const String & path, }; bool cache_on_insert = fs::path(path).extension() != ".tmp" - && write_settings.remote_fs_cache_on_insert - && FileCacheFactory::instance().getSettings(getCacheBasePath()).cache_on_insert; + && write_settings.remote_fs_cache_on_write_operations + && FileCacheFactory::instance().getSettings(getCacheBasePath()).cache_on_write_operations; auto s3_buffer = std::make_unique( settings->client, diff --git a/src/IO/WriteSettings.h b/src/IO/WriteSettings.h index c34193574bb..81a6705cbab 100644 --- a/src/IO/WriteSettings.h +++ b/src/IO/WriteSettings.h @@ -5,7 +5,7 @@ namespace DB struct WriteSettings { - bool remote_fs_cache_on_insert = false; + bool remote_fs_cache_on_write_operations = false; }; } diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index a64ef3a88be..a3169f435e4 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -3203,7 +3203,7 @@ WriteSettings Context::getWriteSettings() const { WriteSettings res; - res.remote_fs_cache_on_insert = settings.remote_fs_cache_on_insert; + res.remote_fs_cache_on_write_operations = settings.remote_fs_cache_on_write_operations; return res; } diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 935a11ec5fa..22c7c6af83e 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -230,7 +230,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() case MergeAlgorithm::Vertical : { ctx->rows_sources_file = createTemporaryFile(ctx->tmp_disk->getPath()); - ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->writeFile(fileName(ctx->rows_sources_file->path())); + ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->writeFile(fileName(ctx->rows_sources_file->path()), DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, global_ctx->context->getWriteSettings()); ctx->rows_sources_write_buf = std::make_unique(*ctx->rows_sources_uncompressed_write_buf); MergeTreeDataPartInMemory::ColumnToSize local_merged_column_to_size; diff --git a/src/Storages/MergeTree/MergeTreeMutationEntry.cpp b/src/Storages/MergeTree/MergeTreeMutationEntry.cpp index 0f71742fb09..2147575f1d5 100644 --- a/src/Storages/MergeTree/MergeTreeMutationEntry.cpp +++ b/src/Storages/MergeTree/MergeTreeMutationEntry.cpp @@ -43,7 +43,7 @@ UInt64 MergeTreeMutationEntry::parseFileName(const String & file_name_) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse mutation version from file name, expected 'mutation_.txt', got '{}'", file_name_); } -MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number) +MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number, const WriteSettings & settings) : create_time(time(nullptr)) , commands(std::move(commands_)) , disk(std::move(disk_)) @@ -53,7 +53,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP { try { - auto out = disk->writeFile(path_prefix + file_name); + auto out = disk->writeFile(std::filesystem::path(path_prefix) / file_name, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, settings); *out << "format version: 1\n" << "create time: " << LocalDateTime(create_time) << "\n"; *out << "commands: "; diff --git a/src/Storages/MergeTree/MergeTreeMutationEntry.h b/src/Storages/MergeTree/MergeTreeMutationEntry.h index 7554a03836e..fa3a4058ae6 100644 --- a/src/Storages/MergeTree/MergeTreeMutationEntry.h +++ b/src/Storages/MergeTree/MergeTreeMutationEntry.h @@ -29,7 +29,7 @@ struct MergeTreeMutationEntry String latest_fail_reason; /// Create a new entry and write it to a temporary file. - MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, UInt64 tmp_number); + MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, UInt64 tmp_number, const WriteSettings & settings); MergeTreeMutationEntry(const MergeTreeMutationEntry &) = delete; MergeTreeMutationEntry(MergeTreeMutationEntry &&) = default; diff --git a/src/Storages/MergeTree/MergeTreePartition.cpp b/src/Storages/MergeTree/MergeTreePartition.cpp index 4edf23bc0fb..128c40929b3 100644 --- a/src/Storages/MergeTree/MergeTreePartition.cpp +++ b/src/Storages/MergeTree/MergeTreePartition.cpp @@ -390,16 +390,18 @@ void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & dis std::unique_ptr MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const { auto metadata_snapshot = storage.getInMemoryMetadataPtr(); - const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, storage.getContext()).sample_block; - return store(partition_key_sample, disk, part_path, checksums); + const auto & context = storage.getContext(); + const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, context).sample_block; + return store(partition_key_sample, disk, part_path, checksums, context->getWriteSettings()); } -std::unique_ptr MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const +std::unique_ptr MergeTreePartition::store( + const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const { if (!partition_key_sample) return nullptr; - auto out = disk->writeFile(part_path + "partition.dat"); + auto out = disk->writeFile(std::filesystem::path(part_path) / "partition.dat", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, settings); HashingWriteBuffer out_hashing(*out); for (size_t i = 0; i < value.size(); ++i) partition_key_sample.getByPosition(i).type->getDefaultSerialization()->serializeBinary(value[i], out_hashing); diff --git a/src/Storages/MergeTree/MergeTreePartition.h b/src/Storages/MergeTree/MergeTreePartition.h index f149fcbcb7e..3bd9202822f 100644 --- a/src/Storages/MergeTree/MergeTreePartition.h +++ b/src/Storages/MergeTree/MergeTreePartition.h @@ -41,7 +41,7 @@ public: /// Store functions return write buffer with written but not finalized data. /// User must call finish() for returned object. [[nodiscard]] std::unique_ptr store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const; - [[nodiscard]] std::unique_ptr store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const; + [[nodiscard]] std::unique_ptr store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const; void assign(const MergeTreePartition & other) { value = other.value; } diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 1fe701c54ae..9641299f1f8 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -419,14 +419,15 @@ void finalizeMutatedPart( const MergeTreeDataPartPtr & source_part, MergeTreeData::MutableDataPartPtr new_data_part, ExecuteTTLType execute_ttl_type, - const CompressionCodecPtr & codec) + const CompressionCodecPtr & codec, + ContextPtr context) { auto disk = new_data_part->volume->getDisk(); auto part_path = fs::path(new_data_part->getFullRelativePath()); if (new_data_part->uuid != UUIDHelpers::Nil) { - auto out = disk->writeFile(part_path / IMergeTreeDataPart::UUID_FILE_NAME, 4096); + auto out = disk->writeFile(part_path / IMergeTreeDataPart::UUID_FILE_NAME, 4096, WriteMode::Rewrite, context->getWriteSettings()); HashingWriteBuffer out_hashing(*out); writeUUIDText(new_data_part->uuid, out_hashing); new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count(); @@ -436,7 +437,7 @@ void finalizeMutatedPart( if (execute_ttl_type != ExecuteTTLType::NONE) { /// Write a file with ttl infos in json format. - auto out_ttl = disk->writeFile(part_path / "ttl.txt", 4096); + auto out_ttl = disk->writeFile(part_path / "ttl.txt", 4096, WriteMode::Rewrite, context->getWriteSettings()); HashingWriteBuffer out_hashing(*out_ttl); new_data_part->ttl_infos.write(out_hashing); new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count(); @@ -445,7 +446,7 @@ void finalizeMutatedPart( if (!new_data_part->getSerializationInfos().empty()) { - auto out = disk->writeFile(part_path / IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096); + auto out = disk->writeFile(part_path / IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, WriteMode::Rewrite, context->getWriteSettings()); HashingWriteBuffer out_hashing(*out); new_data_part->getSerializationInfos().writeJSON(out_hashing); new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count(); @@ -454,18 +455,18 @@ void finalizeMutatedPart( { /// Write file with checksums. - auto out_checksums = disk->writeFile(part_path / "checksums.txt", 4096); + auto out_checksums = disk->writeFile(part_path / "checksums.txt", 4096, WriteMode::Rewrite, context->getWriteSettings()); new_data_part->checksums.write(*out_checksums); } /// close fd { - auto out = disk->writeFile(part_path / IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096); + auto out = disk->writeFile(part_path / IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, WriteMode::Rewrite, context->getWriteSettings()); DB::writeText(queryToString(codec->getFullCodecDesc()), *out); } { /// Write a file with a description of columns. - auto out_columns = disk->writeFile(part_path / "columns.txt", 4096); + auto out_columns = disk->writeFile(part_path / "columns.txt", 4096, WriteMode::Rewrite, context->getWriteSettings()); new_data_part->getColumns().writeText(*out_columns); } /// close fd @@ -1162,7 +1163,7 @@ private: } } - MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec); + MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec, ctx->context); } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 5de1b959d7c..ae6d9e5474e 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -423,7 +423,7 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String { std::lock_guard lock(currently_processing_in_background_mutex); - MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get()); + MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get(), getContext()->getWriteSettings()); version = increment.get(); entry.commit(version); mutation_file_name = entry.file_name; diff --git a/tests/queries/0_stateless/02241_remote_filesystem_cache_on_insert.reference b/tests/queries/0_stateless/02241_remote_filesystem_cache_on_insert.reference index 3d0d9dadf6a..5bc2049204a 100644 --- a/tests/queries/0_stateless/02241_remote_filesystem_cache_on_insert.reference +++ b/tests/queries/0_stateless/02241_remote_filesystem_cache_on_insert.reference @@ -1,15 +1,15 @@ -- { echo } -SET remote_fs_cache_on_insert=1; +SET remote_fs_cache_on_write_operations=1; DROP TABLE IF EXISTS test; -CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3'; +CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache'; SYSTEM DROP REMOTE FILESYSTEM CACHE; SELECT file_segment_range, size, state FROM (SELECT file_segment_range, size, state, local_path FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path) WHERE endsWith(local_path, 'data.bin') FORMAT Vertical; SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path; 0 SELECT count() FROM system.remote_filesystem_cache; 0 -INSERT INTO test SELECT number, toString(number) FROM numbers(100); +INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_write_operations=1; SELECT file_segment_range, size, state FROM (SELECT file_segment_range, size, state, local_path FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path) WHERE endsWith(local_path, 'data.bin') FORMAT Vertical; Row 1: ────── @@ -44,6 +44,17 @@ SELECT count() FROM system.remote_filesystem_cache; 7 SELECT count() FROM system.remote_filesystem_cache; 7 -INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=0; -- still writes cache because now config setting is used +INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_write_operations=0; SELECT count() FROM system.remote_filesystem_cache; 7 +INSERT INTO test SELECT number, toString(number) FROM numbers(100); +INSERT INTO test SELECT number, toString(number) FROM numbers(300, 10000); +SELECT count() FROM system.remote_filesystem_cache; +21 +OPTIMIZE TABLE test FINAL; +SELECT count() FROM system.remote_filesystem_cache; +24 +SET mutations_sync=2; +ALTER TABLE test UPDATE value = 'kek' WHERE key = 100; +SELECT count() FROM system.remote_filesystem_cache; +25 diff --git a/tests/queries/0_stateless/02241_remote_filesystem_cache_on_insert.sql b/tests/queries/0_stateless/02241_remote_filesystem_cache_on_insert.sql index dfcc617e0f4..58f4adb5980 100644 --- a/tests/queries/0_stateless/02241_remote_filesystem_cache_on_insert.sql +++ b/tests/queries/0_stateless/02241_remote_filesystem_cache_on_insert.sql @@ -2,10 +2,10 @@ -- { echo } -SET remote_fs_cache_on_insert=1; +SET remote_fs_cache_on_write_operations=1; DROP TABLE IF EXISTS test; -CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3'; +CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache'; SYSTEM DROP REMOTE FILESYSTEM CACHE; @@ -13,7 +13,7 @@ SELECT file_segment_range, size, state FROM (SELECT file_segment_range, size, st SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path; SELECT count() FROM system.remote_filesystem_cache; -INSERT INTO test SELECT number, toString(number) FROM numbers(100); +INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_write_operations=1; SELECT file_segment_range, size, state FROM (SELECT file_segment_range, size, state, local_path FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path) WHERE endsWith(local_path, 'data.bin') FORMAT Vertical; SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path; @@ -38,7 +38,15 @@ SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, re SELECT count() FROM system.remote_filesystem_cache; SELECT count() FROM system.remote_filesystem_cache; -INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=0; -- still writes cache because now config setting is used +INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_write_operations=0; SELECT count() FROM system.remote_filesystem_cache; +INSERT INTO test SELECT number, toString(number) FROM numbers(100); +INSERT INTO test SELECT number, toString(number) FROM numbers(300, 10000); +SELECT count() FROM system.remote_filesystem_cache; +OPTIMIZE TABLE test FINAL; +SELECT count() FROM system.remote_filesystem_cache; +SET mutations_sync=2; +ALTER TABLE test UPDATE value = 'kek' WHERE key = 100; +SELECT count() FROM system.remote_filesystem_cache;