Merge pull request #5648 from yandex/remove-number-of-stat-calls

Reduce number of "stat" syscalls for MergeTree data parts
This commit is contained in:
alexey-milovidov 2019-06-17 09:30:29 +03:00 committed by GitHub
commit 5fd319d0e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 128 additions and 28 deletions

View File

@ -430,6 +430,7 @@ namespace ErrorCodes
extern const int MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES = 453;
extern const int OPENSSL_ERROR = 454;
extern const int SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY = 455;
extern const int CANNOT_UNLINK = 458;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -20,7 +20,7 @@ NameSet injectRequiredColumns(const MergeTreeData & storage, const MergeTreeData
const auto & column_name = columns[i];
/// column has files and hence does not require evaluation
if (part->hasColumnFiles(column_name))
if (part->hasColumnFiles(column_name, *storage.getColumn(column_name).type))
{
all_column_files_missing = false;
continue;

View File

@ -1112,6 +1112,10 @@ void MergeTreeData::dropAllData()
LOG_TRACE(log, "dropAllData: removing data from filesystem.");
/// Removing of each data part before recursive removal of directory is to speed-up removal, because there will be less number of syscalls.
for (DataPartPtr part : data_parts_by_info) /// a copy intended
part->remove();
Poco::File(full_path).remove(true);
LOG_TRACE(log, "dropAllData: done.");
@ -1323,7 +1327,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
if (!new_types.count(column.name))
{
/// The column was deleted.
if (!part || part->hasColumnFiles(column.name))
if (!part || part->hasColumnFiles(column.name, *column.type))
{
column.type->enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
@ -1345,7 +1349,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
const String new_type_name = new_type->getName();
const auto * old_type = column.type.get();
if (!new_type->equals(*old_type) && (!part || part->hasColumnFiles(column.name)))
if (!new_type->equals(*old_type) && (!part || part->hasColumnFiles(column.name, *column.type)))
{
if (isMetadataOnlyConversion(old_type, new_type))
{

View File

@ -34,6 +34,7 @@ namespace ErrorCodes
extern const int NOT_FOUND_EXPECTED_DATA_PART;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int BAD_TTL_FILE;
extern const int CANNOT_UNLINK;
}
@ -216,7 +217,7 @@ String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
for (const auto & column : storage_columns)
{
if (!hasColumnFiles(column.name))
if (!hasColumnFiles(column.name, *column.type))
continue;
const auto size = getColumnSize(column.name, *column.type).data_compressed;
@ -395,7 +396,36 @@ void MergeTreeDataPart::remove() const
return;
}
to_dir.remove(true);
try
{
/// Remove each expected file in directory, then remove directory itself.
for (const auto & [file, _] : checksums.files)
{
String path_to_remove = to + "/" + file;
if (0 != unlink(path_to_remove.c_str()))
throwFromErrno("Cannot unlink file " + path_to_remove, ErrorCodes::CANNOT_UNLINK);
}
for (const auto & file : {"checksums.txt", "columns.txt"})
{
String path_to_remove = to + "/" + file;
if (0 != unlink(path_to_remove.c_str()))
throwFromErrno("Cannot unlink file " + path_to_remove, ErrorCodes::CANNOT_UNLINK);
}
if (0 != rmdir(to.c_str()))
throwFromErrno("Cannot rmdir file " + to, ErrorCodes::CANNOT_UNLINK);
}
catch (...)
{
/// Recursive directory removal does many excessive "stat" syscalls under the hood.
LOG_ERROR(storage.log, "Cannot quickly remove directory " << to << " by removing files; fallback to recursive removal. Reason: "
<< getCurrentExceptionMessage(false));
to_dir.remove(true);
}
}
@ -858,16 +888,22 @@ void MergeTreeDataPart::checkConsistency(bool require_part_metadata)
}
}
bool MergeTreeDataPart::hasColumnFiles(const String & column) const
bool MergeTreeDataPart::hasColumnFiles(const String & column_name, const IDataType & type) const
{
/// NOTE: For multi-streams columns we check that just first file exist.
/// That's Ok under assumption that files exist either for all or for no streams.
bool res = true;
String prefix = getFullPath();
type.enumerateStreams([&](const IDataType::SubstreamPath & substream_path)
{
String file_name = IDataType::getFileNameForStream(column_name, substream_path);
String escaped_column = escapeForFileName(column);
return Poco::File(prefix + escaped_column + ".bin").exists()
&& Poco::File(prefix + escaped_column + storage.index_granularity_info.marks_file_extension).exists();
auto bin_checksum = checksums.files.find(file_name + ".bin");
auto mrk_checksum = checksums.files.find(file_name + storage.index_granularity_info.marks_file_extension);
if (bin_checksum == checksums.files.end() || mrk_checksum == checksums.files.end())
res = false;
}, {});
return res;
}

View File

@ -274,7 +274,7 @@ struct MergeTreeDataPart
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
/// Checks that .bin and .mrk files exist
bool hasColumnFiles(const String & column) const;
bool hasColumnFiles(const String & column, const IDataType & type) const;
/// For data in RAM ('index')
UInt64 getIndexSizeInBytes() const;

View File

@ -129,12 +129,14 @@ MergeTreeReadTaskPtr MergeTreeReadPool::getTask(const size_t min_marks_to_read,
prewhere_info && prewhere_info->remove_prewhere_column, per_part_should_reorder[part_idx], std::move(curr_task_size_predictor));
}
MarkRanges MergeTreeReadPool::getRestMarks(const std::string & part_path, const MarkRange & from) const
MarkRanges MergeTreeReadPool::getRestMarks(const MergeTreeDataPart & part, const MarkRange & from) const
{
MarkRanges all_part_ranges;
/// Inefficient in presence of large number of data parts.
for (const auto & part_ranges : parts_ranges)
{
if (part_ranges.data_part->getFullPath() == part_path)
if (part_ranges.data_part.get() == &part)
{
all_part_ranges = part_ranges.ranges;
break;
@ -142,7 +144,7 @@ MarkRanges MergeTreeReadPool::getRestMarks(const std::string & part_path, const
}
if (all_part_ranges.empty())
throw Exception("Trying to read marks range [" + std::to_string(from.begin) + ", " + std::to_string(from.end) + "] from part '"
+ part_path + "' which has no ranges in this query", ErrorCodes::LOGICAL_ERROR);
+ part.getFullPath() + "' which has no ranges in this query", ErrorCodes::LOGICAL_ERROR);
auto begin = std::lower_bound(all_part_ranges.begin(), all_part_ranges.end(), from, [] (const auto & f, const auto & s) { return f.begin < s.begin; });
if (begin == all_part_ranges.end())

View File

@ -81,7 +81,7 @@ public:
void profileFeedback(const ReadBufferFromFileBase::ProfileInfo info);
/// This method tells which mark ranges we have to read if we start from @from mark range
MarkRanges getRestMarks(const std::string & part_path, const MarkRange & from) const;
MarkRanges getRestMarks(const MergeTreeDataPart & part, const MarkRange & from) const;
Block getHeader() const;

View File

@ -44,9 +44,6 @@ MergeTreeReader::MergeTreeReader(const String & path,
{
try
{
if (!Poco::File(path).exists())
throw Exception("Part " + path + " is missing", ErrorCodes::NOT_FOUND_EXPECTED_DATA_PART);
for (const NameAndTypePair & column : columns)
addStreams(column.name, *column.type, profile_callback, clock_type);
}
@ -163,7 +160,7 @@ void MergeTreeReader::addStreams(const String & name, const IDataType & type,
if (streams.count(stream_name))
return;
bool data_file_exists = Poco::File(path + stream_name + DATA_FILE_EXTENSION).exists();
bool data_file_exists = data_part->checksums.files.count(stream_name + DATA_FILE_EXTENSION);
/** If data file is missing then we will not try to open it.
* It is necessary since it allows to add new column to structure of the table without creating new files for old parts.

View File

@ -74,7 +74,7 @@ bool MergeTreeThreadSelectBlockInputStream::getNewTask()
if (!reader)
{
auto rest_mark_ranges = pool->getRestMarks(path, task->mark_ranges[0]);
auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]);
if (use_uncompressed_cache)
owned_uncompressed_cache = storage.global_context.getUncompressedCache();
@ -95,7 +95,7 @@ bool MergeTreeThreadSelectBlockInputStream::getNewTask()
/// in other case we can reuse readers, anyway they will be "seeked" to required mark
if (path != last_readed_part_path)
{
auto rest_mark_ranges = pool->getRestMarks(path, task->mark_ranges[0]);
auto rest_mark_ranges = pool->getRestMarks(*task->data_part, task->mark_ranges[0]);
/// retain avg_value_size_hints
reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(), save_marks_in_cache,

View File

@ -0,0 +1,30 @@
<test>
<name>merge_tree_many_partitions</name>
<type>loop</type>
<create_query>CREATE TABLE bad_partitions (x UInt64) ENGINE = MergeTree PARTITION BY x ORDER BY x</create_query>
<fill_query>INSERT INTO bad_partitions SELECT * FROM numbers(10000)</fill_query>
<stop_conditions>
<all_of>
<iterations>5</iterations>
<min_time_not_changing_for_ms>10000</min_time_not_changing_for_ms>
</all_of>
<any_of>
<iterations>100</iterations>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<min_time/>
</main_metric>
<settings>
<max_partitions_per_insert_block>0</max_partitions_per_insert_block>
</settings>
<query>SELECT count() FROM bad_partitions</query>
<drop_query>DROP TABLE IF EXISTS bad_partitions</drop_query>
</test>

View File

@ -0,0 +1,30 @@
<test>
<name>merge_tree_many_partitions_2</name>
<type>loop</type>
<create_query>CREATE TABLE bad_partitions (a UInt64, b UInt64, c UInt64, d UInt64, e UInt64, f UInt64, g UInt64, h UInt64, i UInt64, j UInt64, k UInt64, l UInt64, m UInt64, n UInt64, o UInt64, p UInt64, q UInt64, r UInt64, s UInt64, t UInt64, u UInt64, v UInt64, w UInt64, x UInt64, y UInt64, z UInt64) ENGINE = MergeTree PARTITION BY x ORDER BY x</create_query>
<fill_query>INSERT INTO bad_partitions (x) SELECT * FROM numbers(10000)</fill_query>
<stop_conditions>
<all_of>
<iterations>5</iterations>
<min_time_not_changing_for_ms>10000</min_time_not_changing_for_ms>
</all_of>
<any_of>
<iterations>100</iterations>
<total_time_ms>60000</total_time_ms>
</any_of>
</stop_conditions>
<main_metric>
<min_time/>
</main_metric>
<settings>
<max_partitions_per_insert_block>0</max_partitions_per_insert_block>
</settings>
<query>SELECT sum(ignore(*)) FROM bad_partitions</query>
<drop_query>DROP TABLE IF EXISTS bad_partitions</drop_query>
</test>

View File

@ -18,9 +18,9 @@ SELECT * FROM merge(currentDatabase(), 'test_local_1');
SELECT *, _table FROM merge(currentDatabase(), 'test_local_1') ORDER BY _table;
SELECT sum(value), _table FROM merge(currentDatabase(), 'test_local_1') GROUP BY _table ORDER BY _table;
SELECT * FROM merge(currentDatabase(), 'test_local_1') WHERE _table = 'test_local_1';
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table = 'test_local_1'; -- { serverError 8 }
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table = 'test_local_1'; -- { serverError 16 }
SELECT * FROM merge(currentDatabase(), 'test_local_1') WHERE _table in ('test_local_1', 'test_local_2');
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table in ('test_local_1', 'test_local_2'); -- { serverError 8 }
SELECT * FROM merge(currentDatabase(), 'test_local_1') PREWHERE _table in ('test_local_1', 'test_local_2'); -- { serverError 16 }
SELECT '--------------Single Distributed------------';
SELECT * FROM merge(currentDatabase(), 'test_distributed_1');
@ -36,9 +36,9 @@ SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') ORDER BY _ta
SELECT *, _table FROM merge(currentDatabase(), 'test_local_1|test_local_2') ORDER BY _table;
SELECT sum(value), _table FROM merge(currentDatabase(), 'test_local_1|test_local_2') GROUP BY _table ORDER BY _table;
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') WHERE _table = 'test_local_1';
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table = 'test_local_1'; -- {serverError 8}
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table = 'test_local_1'; -- { serverError 16 }
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') WHERE _table in ('test_local_1', 'test_local_2') ORDER BY value;
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table in ('test_local_1', 'test_local_2') ORDER BY value; -- {serverError 8}
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_local_2') PREWHERE _table in ('test_local_1', 'test_local_2') ORDER BY value; -- { serverError 16 }
SELECT '--------------Local Merge Distributed------------';
SELECT * FROM merge(currentDatabase(), 'test_local_1|test_distributed_2') ORDER BY _table;
@ -80,7 +80,7 @@ SELECT '--------------Implicit type conversion------------';
SELECT * FROM merge(currentDatabase(), 'test_s64_distributed|test_u64_distributed') ORDER BY value;
SELECT * FROM merge(currentDatabase(), 'test_s64_distributed|test_u64_distributed') WHERE date = '2018-08-01' ORDER BY value;
SELECT * FROM merge(currentDatabase(), 'test_s64_distributed|test_u64_distributed') WHERE _table = 'test_u64_distributed' ORDER BY value;
SELECT * FROM merge(currentDatabase(), 'test_s64_distributed|test_u64_distributed') WHERE value = 1; -- { serverError 171}
SELECT * FROM merge(currentDatabase(), 'test_s64_distributed|test_u64_distributed') WHERE value = 1; -- { serverError 171 }
DROP TABLE IF EXISTS test_u64_local;
DROP TABLE IF EXISTS test_s64_local;