Cache on all write operations

This commit is contained in:
kssenii 2022-03-23 19:00:42 +01:00
parent e39aba37a2
commit d2a3cfe5dc
16 changed files with 57 additions and 35 deletions

View File

@ -10,7 +10,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
max_size = config.getUInt64(config_prefix + ".data_cache_max_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_CACHE_SIZE);
max_elements = config.getUInt64(config_prefix + ".data_cache_max_elements", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS);
max_file_segment_size = config.getUInt64(config_prefix + ".max_file_segment_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE);
cache_on_insert = config.getUInt64(config_prefix + ".cache_on_insert", false);
cache_on_write_operations = config.getUInt64(config_prefix + ".cache_on_write_operations", false);
}
}

View File

@ -12,7 +12,7 @@ struct FileCacheSettings
size_t max_size = 0;
size_t max_elements = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS;
size_t max_file_segment_size = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE;
bool cache_on_insert = false;
bool cache_on_write_operations = false;
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
};

View File

@ -557,7 +557,7 @@ class IColumn;
M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
M(Bool, remote_fs_enable_cache, true, "Use cache for remote filesystem. This setting does not turn on/off cache for disks (must me done via disk config), but allows to bypass cache for some queries if intended", 0) \
M(UInt64, remote_fs_cache_max_wait_sec, 5, "Allow to wait at most this number of seconds for download of current remote_fs_buffer_size bytes, and skip cache if exceeded", 0) \
M(Bool, remote_fs_cache_on_insert, false, "Write into cache on INSERT query", 0) \
M(Bool, remote_fs_cache_on_write_operations, true, "Write into cache on INSERT query To actually work this setting requires be added to disk config too", 0) \
\
M(UInt64, http_max_tries, 10, "Max attempts to read via http.", 0) \
M(UInt64, http_retry_initial_backoff_ms, 100, "Min milliseconds for backoff, when retrying read via http", 0) \

View File

@ -169,7 +169,7 @@ DiskCacheWrapper::readFile(
auto src_buffer = DiskDecorator::readFile(path, current_read_settings, read_hint, file_size);
WriteSettings write_settings;
write_settings.remote_fs_cache_on_insert = false;
write_settings.remote_fs_cache_on_write_operations = false;
auto dst_buffer = cache_disk->writeFile(tmp_path, settings.local_fs_buffer_size, WriteMode::Rewrite, write_settings);
copyData(*src_buffer, *dst_buffer);
@ -206,7 +206,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
return DiskDecorator::writeFile(path, buf_size, mode, settings);
WriteSettings current_settings = settings;
current_settings.remote_fs_cache_on_insert = false;
current_settings.remote_fs_cache_on_write_operations = false;
LOG_TEST(log, "Write file {} to cache", backQuote(path));

View File

@ -292,8 +292,8 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
};
bool cache_on_insert = fs::path(path).extension() != ".tmp"
&& write_settings.remote_fs_cache_on_insert
&& FileCacheFactory::instance().getSettings(getCacheBasePath()).cache_on_insert;
&& write_settings.remote_fs_cache_on_write_operations
&& FileCacheFactory::instance().getSettings(getCacheBasePath()).cache_on_write_operations;
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
settings->client,

View File

@ -5,7 +5,7 @@ namespace DB
struct WriteSettings
{
bool remote_fs_cache_on_insert = false;
bool remote_fs_cache_on_write_operations = false;
};
}

View File

@ -3203,7 +3203,7 @@ WriteSettings Context::getWriteSettings() const
{
WriteSettings res;
res.remote_fs_cache_on_insert = settings.remote_fs_cache_on_insert;
res.remote_fs_cache_on_write_operations = settings.remote_fs_cache_on_write_operations;
return res;
}

View File

@ -230,7 +230,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
case MergeAlgorithm::Vertical :
{
ctx->rows_sources_file = createTemporaryFile(ctx->tmp_disk->getPath());
ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->writeFile(fileName(ctx->rows_sources_file->path()));
ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->writeFile(fileName(ctx->rows_sources_file->path()), DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, global_ctx->context->getWriteSettings());
ctx->rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*ctx->rows_sources_uncompressed_write_buf);
MergeTreeDataPartInMemory::ColumnToSize local_merged_column_to_size;

View File

@ -43,7 +43,7 @@ UInt64 MergeTreeMutationEntry::parseFileName(const String & file_name_)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot parse mutation version from file name, expected 'mutation_<UInt64>.txt', got '{}'", file_name_);
}
MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number)
MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk_, const String & path_prefix_, UInt64 tmp_number, const WriteSettings & settings)
: create_time(time(nullptr))
, commands(std::move(commands_))
, disk(std::move(disk_))
@ -53,7 +53,7 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP
{
try
{
auto out = disk->writeFile(path_prefix + file_name);
auto out = disk->writeFile(std::filesystem::path(path_prefix) / file_name, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, settings);
*out << "format version: 1\n"
<< "create time: " << LocalDateTime(create_time) << "\n";
*out << "commands: ";

View File

@ -29,7 +29,7 @@ struct MergeTreeMutationEntry
String latest_fail_reason;
/// Create a new entry and write it to a temporary file.
MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, UInt64 tmp_number);
MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, UInt64 tmp_number, const WriteSettings & settings);
MergeTreeMutationEntry(const MergeTreeMutationEntry &) = delete;
MergeTreeMutationEntry(MergeTreeMutationEntry &&) = default;

View File

@ -390,16 +390,18 @@ void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & dis
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, storage.getContext()).sample_block;
return store(partition_key_sample, disk, part_path, checksums);
const auto & context = storage.getContext();
const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, context).sample_block;
return store(partition_key_sample, disk, part_path, checksums, context->getWriteSettings());
}
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const
std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(
const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const
{
if (!partition_key_sample)
return nullptr;
auto out = disk->writeFile(part_path + "partition.dat");
auto out = disk->writeFile(std::filesystem::path(part_path) / "partition.dat", DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, settings);
HashingWriteBuffer out_hashing(*out);
for (size_t i = 0; i < value.size(); ++i)
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->serializeBinary(value[i], out_hashing);

View File

@ -41,7 +41,7 @@ public:
/// Store functions return write buffer with written but not finalized data.
/// User must call finish() for returned object.
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const Block & partition_key_sample, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums, const WriteSettings & settings) const;
void assign(const MergeTreePartition & other) { value = other.value; }

View File

@ -419,14 +419,15 @@ void finalizeMutatedPart(
const MergeTreeDataPartPtr & source_part,
MergeTreeData::MutableDataPartPtr new_data_part,
ExecuteTTLType execute_ttl_type,
const CompressionCodecPtr & codec)
const CompressionCodecPtr & codec,
ContextPtr context)
{
auto disk = new_data_part->volume->getDisk();
auto part_path = fs::path(new_data_part->getFullRelativePath());
if (new_data_part->uuid != UUIDHelpers::Nil)
{
auto out = disk->writeFile(part_path / IMergeTreeDataPart::UUID_FILE_NAME, 4096);
auto out = disk->writeFile(part_path / IMergeTreeDataPart::UUID_FILE_NAME, 4096, WriteMode::Rewrite, context->getWriteSettings());
HashingWriteBuffer out_hashing(*out);
writeUUIDText(new_data_part->uuid, out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::UUID_FILE_NAME].file_size = out_hashing.count();
@ -436,7 +437,7 @@ void finalizeMutatedPart(
if (execute_ttl_type != ExecuteTTLType::NONE)
{
/// Write a file with ttl infos in json format.
auto out_ttl = disk->writeFile(part_path / "ttl.txt", 4096);
auto out_ttl = disk->writeFile(part_path / "ttl.txt", 4096, WriteMode::Rewrite, context->getWriteSettings());
HashingWriteBuffer out_hashing(*out_ttl);
new_data_part->ttl_infos.write(out_hashing);
new_data_part->checksums.files["ttl.txt"].file_size = out_hashing.count();
@ -445,7 +446,7 @@ void finalizeMutatedPart(
if (!new_data_part->getSerializationInfos().empty())
{
auto out = disk->writeFile(part_path / IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096);
auto out = disk->writeFile(part_path / IMergeTreeDataPart::SERIALIZATION_FILE_NAME, 4096, WriteMode::Rewrite, context->getWriteSettings());
HashingWriteBuffer out_hashing(*out);
new_data_part->getSerializationInfos().writeJSON(out_hashing);
new_data_part->checksums.files[IMergeTreeDataPart::SERIALIZATION_FILE_NAME].file_size = out_hashing.count();
@ -454,18 +455,18 @@ void finalizeMutatedPart(
{
/// Write file with checksums.
auto out_checksums = disk->writeFile(part_path / "checksums.txt", 4096);
auto out_checksums = disk->writeFile(part_path / "checksums.txt", 4096, WriteMode::Rewrite, context->getWriteSettings());
new_data_part->checksums.write(*out_checksums);
} /// close fd
{
auto out = disk->writeFile(part_path / IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
auto out = disk->writeFile(part_path / IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, WriteMode::Rewrite, context->getWriteSettings());
DB::writeText(queryToString(codec->getFullCodecDesc()), *out);
}
{
/// Write a file with a description of columns.
auto out_columns = disk->writeFile(part_path / "columns.txt", 4096);
auto out_columns = disk->writeFile(part_path / "columns.txt", 4096, WriteMode::Rewrite, context->getWriteSettings());
new_data_part->getColumns().writeText(*out_columns);
} /// close fd
@ -1162,7 +1163,7 @@ private:
}
}
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec);
MutationHelpers::finalizeMutatedPart(ctx->source_part, ctx->new_data_part, ctx->execute_ttl_type, ctx->compression_codec, ctx->context);
}

View File

@ -423,7 +423,7 @@ Int64 StorageMergeTree::startMutation(const MutationCommands & commands, String
{
std::lock_guard lock(currently_processing_in_background_mutex);
MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get());
MergeTreeMutationEntry entry(commands, disk, relative_data_path, insert_increment.get(), getContext()->getWriteSettings());
version = increment.get();
entry.commit(version);
mutation_file_name = entry.file_name;

View File

@ -1,15 +1,15 @@
-- { echo }
SET remote_fs_cache_on_insert=1;
SET remote_fs_cache_on_write_operations=1;
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache';
SYSTEM DROP REMOTE FILESYSTEM CACHE;
SELECT file_segment_range, size, state FROM (SELECT file_segment_range, size, state, local_path FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path) WHERE endsWith(local_path, 'data.bin') FORMAT Vertical;
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
0
SELECT count() FROM system.remote_filesystem_cache;
0
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_write_operations=1;
SELECT file_segment_range, size, state FROM (SELECT file_segment_range, size, state, local_path FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path) WHERE endsWith(local_path, 'data.bin') FORMAT Vertical;
Row 1:
──────
@ -44,6 +44,17 @@ SELECT count() FROM system.remote_filesystem_cache;
7
SELECT count() FROM system.remote_filesystem_cache;
7
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=0; -- still writes cache because now config setting is used
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_write_operations=0;
SELECT count() FROM system.remote_filesystem_cache;
7
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
INSERT INTO test SELECT number, toString(number) FROM numbers(300, 10000);
SELECT count() FROM system.remote_filesystem_cache;
21
OPTIMIZE TABLE test FINAL;
SELECT count() FROM system.remote_filesystem_cache;
24
SET mutations_sync=2;
ALTER TABLE test UPDATE value = 'kek' WHERE key = 100;
SELECT count() FROM system.remote_filesystem_cache;
25

View File

@ -2,10 +2,10 @@
-- { echo }
SET remote_fs_cache_on_insert=1;
SET remote_fs_cache_on_write_operations=1;
DROP TABLE IF EXISTS test;
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';
CREATE TABLE test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3_cache';
SYSTEM DROP REMOTE FILESYSTEM CACHE;
@ -13,7 +13,7 @@ SELECT file_segment_range, size, state FROM (SELECT file_segment_range, size, st
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
SELECT count() FROM system.remote_filesystem_cache;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_write_operations=1;
SELECT file_segment_range, size, state FROM (SELECT file_segment_range, size, state, local_path FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path) WHERE endsWith(local_path, 'data.bin') FORMAT Vertical;
SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, remote_path FROM system.remote_data_paths ) AS data_paths INNER JOIN system.remote_filesystem_cache AS caches ON data_paths.cache_path = caches.cache_path;
@ -38,7 +38,15 @@ SELECT count() FROM (SELECT arrayJoin(cache_paths) AS cache_path, local_path, re
SELECT count() FROM system.remote_filesystem_cache;
SELECT count() FROM system.remote_filesystem_cache;
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_insert=0; -- still writes cache because now config setting is used
INSERT INTO test SELECT number, toString(number) FROM numbers(100) SETTINGS remote_fs_cache_on_write_operations=0;
SELECT count() FROM system.remote_filesystem_cache;
INSERT INTO test SELECT number, toString(number) FROM numbers(100);
INSERT INTO test SELECT number, toString(number) FROM numbers(300, 10000);
SELECT count() FROM system.remote_filesystem_cache;
OPTIMIZE TABLE test FINAL;
SELECT count() FROM system.remote_filesystem_cache;
SET mutations_sync=2;
ALTER TABLE test UPDATE value = 'kek' WHERE key = 100;
SELECT count() FROM system.remote_filesystem_cache;