Merge pull request #2 from ObjatieGroba/questions

Add space reservation for each new MargeTreeDataPart.
This commit is contained in:
Igr 2019-04-03 15:54:01 +03:00 committed by GitHub
commit fab2c64110
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 187 additions and 58 deletions

View File

@ -16,6 +16,7 @@ class ActiveDataPartSet
{
public:
struct PartPathName {
/// path + name is absolute path to DataPart
String path;
String name;
};

View File

@ -30,6 +30,8 @@ namespace ErrorCodes
* Could "reserve" space, for different operations to plan disk space usage.
* Reservations are not separated for different filesystems,
* instead it is assumed, that all reservations are done within same filesystem.
*
* It is necessary to set all paths in map before MergeTreeData starts
*/
class DiskSpaceMonitor
{
@ -86,8 +88,17 @@ public:
return size;
}
Reservation(UInt64 size_, DiskReserve * reserves_)
: size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size), reserves(reserves_)
const String & getPath() const {
return path;
}
void addEnclosedDirToPath(const String & dir) {
path += dir + '/';
}
Reservation(UInt64 size_, DiskReserve * reserves_, const String & path_)
: size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size), reserves(reserves_),
path(path_)
{
std::lock_guard lock(DiskSpaceMonitor::mutex);
reserves->reserved_bytes += size;
@ -98,15 +109,16 @@ public:
UInt64 size;
CurrentMetrics::Increment metric_increment;
DiskReserve * reserves;
String path;
};
using ReservationPtr = std::unique_ptr<Reservation>;
static UInt64 getUnreservedFreeSpace(const std::string & path)
static UInt64 getUnreservedFreeSpace(const String & disk_path)
{
struct statvfs fs;
if (statvfs(path.c_str(), &fs) != 0)
if (statvfs(disk_path.c_str(), &fs) != 0)
throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS);
UInt64 res = fs.f_bfree * fs.f_bsize;
@ -116,7 +128,7 @@ public:
std::lock_guard lock(mutex);
auto & reserved_bytes = reserved[path].reserved_bytes;
auto & reserved_bytes = reserved[disk_path].reserved_bytes;
if (reserved_bytes > res)
res = 0;
@ -126,26 +138,60 @@ public:
return res;
}
static UInt64 getReservedSpace(const std::string & path)
/** Returns max of unreserved free space on all disks
* It is necessary to have guarantee that all paths are set
*/
static UInt64 getMaxUnreservedFreeSpace()
{
std::lock_guard lock(mutex);
return reserved[path].reserved_bytes;
UInt64 max_unreserved = 0;
for (auto& [disk_path, reserve] : reserved) {
struct statvfs fs;
if (statvfs(disk_path.c_str(), &fs) != 0)
throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS);
UInt64 res = fs.f_bfree * fs.f_bsize;
/// Heuristic by Michael Kolupaev: reserve 30 MB more, because statvfs shows few megabytes more space than df.
res -= std::min(res, static_cast<UInt64>(30 * (1ul << 20)));
///@TODO_IGR ASK Maybe mutex out of for
std::lock_guard lock(mutex);
auto &reserved_bytes = reserved[disk_path].reserved_bytes;
if (reserved_bytes > res)
res = 0;
else
res -= reserved_bytes;
max_unreserved = std::max(max_unreserved, res);
}
return max_unreserved;
}
static UInt64 getReservationCount(const std::string & path)
static UInt64 getReservedSpace(const String & disk_path)
{
std::lock_guard lock(mutex);
return reserved[path].reservation_count;
return reserved[disk_path].reserved_bytes;
}
/// If not enough (approximately) space, throw an exception.
static ReservationPtr reserve(const std::string & path, UInt64 size)
static UInt64 getReservationCount(const String & disk_path)
{
UInt64 free_bytes = getUnreservedFreeSpace(path);
std::lock_guard lock(mutex);
return reserved[disk_path].reservation_count;
}
/// If not enough (approximately) space, do not reserve.
static ReservationPtr tryToReserve(const String & disk_path, UInt64 size)
{
UInt64 free_bytes = getUnreservedFreeSpace(disk_path);
///@TODO_IGR ASK twice reservation?
if (free_bytes < size)
throw Exception("Not enough free disk space to reserve: " + formatReadableSizeWithBinarySuffix(free_bytes) + " available, "
+ formatReadableSizeWithBinarySuffix(size) + " requested", ErrorCodes::NOT_ENOUGH_SPACE);
return std::make_unique<Reservation>(size, &reserved[path]);
{
return {};
}
return std::make_unique<Reservation>(size, &reserved[disk_path], disk_path);
}
private:
@ -153,4 +199,61 @@ private:
static std::mutex mutex;
};
class Schema
{
class Volume {
friend class Schema;
public:
Volume(std::vector<String> paths_) : paths(std::move(paths_))
{
}
DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const {
for (size_t i = 0; i != paths.size(); ++i) {
last_used = (last_used + 1) % paths.size();
auto reservation = DiskSpaceMonitor::tryToReserve(paths[last_used], expected_size);
if (reservation) {
return reservation;
}
}
return {};
}
private:
const Strings paths;
mutable size_t last_used = 0; ///@TODO_IGR ASK It is thread safe, but it is not consistent. :(
/// P.S. I do not want to use mutex here
};
public:
Schema(const std::vector<Strings> & disks) {
for (const Strings & volume : disks) {
volumes.emplace_back(volume);
}
}
///@TODO_IGR ASK maybe iterator without copy?
Strings getFullPaths() const {
Strings res;
for (const auto & volume : volumes) {
std::copy(volume.paths.begin(), volume.paths.end(), std::back_inserter(res));
}
return res;
}
DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const {
for (auto & volume : volumes) {
auto reservation = volume.reserve(expected_size);
if (reservation) {
return reservation;
}
}
return {};
}
private:
std::vector<Volume> volumes;
};
}

View File

@ -89,7 +89,7 @@ namespace ErrorCodes
MergeTreeData::MergeTreeData(
const String & database_, const String & table_,
const Strings & full_paths_, const ColumnsDescription & columns_,
const Schema & schema_, const ColumnsDescription & columns_,
const IndicesDescription & indices_,
Context & context_,
const String & date_column_name,
@ -110,7 +110,7 @@ MergeTreeData::MergeTreeData(
sample_by_ast(sample_by_ast_),
require_part_metadata(require_part_metadata_),
database_name(database_), table_name(table_),
full_paths(full_paths_),
schema(schema_),
broken_part_callback(broken_part_callback_),
log_name(database_name + "." + table_name), log(&Logger::get(log_name + " (Data)")),
data_parts_by_info(data_parts_indexes.get<TagByInfo>()),
@ -159,7 +159,10 @@ MergeTreeData::MergeTreeData(
min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING;
}
auto full_paths = getFullPaths();
auto format_path = full_paths[0]; ///@TODO_IGR ASK What path should we use for format file?
/// Use first disk. If format file not there move it.
auto path_exists = Poco::File(format_path).exists();
for (const String & path : full_paths) {
@ -629,6 +632,8 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
LOG_DEBUG(log, "Loading data parts");
const auto full_paths = getFullPaths();
std::vector<std::pair<String, size_t>> part_file_names;
Poco::DirectoryIterator end;
for (size_t i = 0; i != full_paths.size(); ++i)
@ -840,6 +845,8 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life
? current_time - custom_directories_lifetime_seconds
: current_time - settings.temporary_directories_lifetime.totalSeconds();
const auto full_paths = getFullPaths();
/// Delete temporary directories older than a day.
Poco::DirectoryIterator end;
for (auto && full_path : full_paths)
@ -1006,7 +1013,7 @@ void MergeTreeData::dropAllData()
LOG_TRACE(log, "dropAllData: removing data from filesystem.");
for (auto && full_path : full_paths) {
for (auto && full_path : getFullPaths()) {
Poco::File(full_path).remove(true);
}
@ -2273,6 +2280,8 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_id) const
Poco::DirectoryIterator end;
const auto full_paths = getFullPaths();
for (const String & full_path : full_paths)
{
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
@ -2413,7 +2422,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat
return res;
}
String MergeTreeData::getFullPathForPart(UInt64 expected_size) const
DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceForPart(UInt64 expected_size) const
{
std::cerr << "Exp size " << expected_size << std::endl;
constexpr UInt64 SIZE_100MB = 100ull << 20;
@ -2422,16 +2431,14 @@ String MergeTreeData::getFullPathForPart(UInt64 expected_size) const
if (expected_size < SIZE_100MB) {
expected_size = SIZE_100MB;
}
for (const String & path : full_paths) {
UInt64 free_space = DiskSpaceMonitor::getUnreservedFreeSpace(path); ///@TODO_IGR ASK reserve? YES, we are
if (free_space > expected_size * MAGIC_CONST) {
std::cerr << "Choosed " << free_space << " " << path << std::endl;
return path;
}
auto reservation = reserveSpaceAtDisk(expected_size * MAGIC_CONST);
if (reservation) {
return reservation;
}
std::cerr << "Choosed last " << full_paths[full_paths.size() - 1] << std::endl;
return full_paths[full_paths.size() - 1];
throw Exception("Not enough free disk space to reserve: " + formatReadableSizeWithBinarySuffix(expected_size) + " requested, "
+ formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getMaxUnreservedFreeSpace()) + " available",
ErrorCodes::NOT_ENOUGH_SPACE);
}
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
@ -2619,8 +2626,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
///@TODO_IGR ASK Maybe flag that it is not recent part? Or choose same dir if it is possible
Poco::Path dst_part_absolute_path = Poco::Path(getFullPathForPart(src_part->bytes_on_disk) + tmp_dst_part_name).absolute();
auto reservation = reserveSpaceForPart(src_part->bytes_on_disk);
String dst_part_path = reservation->getPath();
Poco::Path dst_part_absolute_path = Poco::Path(dst_part_path + tmp_dst_part_name).absolute();
Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute();
if (Poco::File(dst_part_absolute_path).exists())
@ -2629,7 +2637,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString());
localBackup(src_part_absolute_path, dst_part_absolute_path);
MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared<MergeTreeData::DataPart>(*this, dst_part_storage_path, dst_part_name, dst_part_info);
MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared<MergeTreeData::DataPart>(*this, dst_part_path, dst_part_name, dst_part_info);
dst_data_part->relative_path = tmp_dst_part_name;
dst_data_part->is_temp = true;
@ -2638,6 +2646,15 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
return dst_data_part;
}
DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceAtDisk(UInt64 expected_size) const {
auto reservation = schema.reserve(expected_size);
if (reservation) {
/// Add path to table at disk
reservation->addEnclosedDirToPath(table_name); ///@TODO_IGR ASK can we use table_name here? Could path be different?
}
return reservation;
}
void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & with_name, const Context & context)
{
String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString();

View File

@ -15,6 +15,7 @@
#include <DataStreams/GraphiteRollupSortedBlockInputStream.h>
#include <Storages/MergeTree/MergeTreeDataPart.h>
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/DiskSpaceMonitor.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
@ -286,7 +287,7 @@ public:
};
/// Attach the table corresponding to the directory in full_path (must end with /), with the given columns.
/// Attach the table corresponding to the directory in full_path inside schema (must end with /), with the given columns.
/// Correctness of names and paths is not checked.
///
/// date_column_name - if not empty, the name of the Date column used for partitioning by month.
@ -303,7 +304,7 @@ public:
/// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
/// attach - whether the existing table is attached or the new table is created.
MergeTreeData(const String & database_, const String & table_,
const Strings & full_paths_,
const Schema & schema_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
Context & context_,
@ -363,7 +364,7 @@ public:
String getTableName() const { return table_name; }
String getFullPathForPart(UInt64 expected_size) const;
DiskSpaceMonitor::ReservationPtr reserveSpaceForPart(UInt64 expected_size) const; ///@TODO_IGR ASK Is it realy const?
String getLogName() const { return log_name; }
@ -569,6 +570,17 @@ public:
MergeTreeData::MutableDataPartPtr cloneAndLoadDataPart(const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix,
const MergeTreePartInfo & dst_part_info);
DiskSpaceMonitor::ReservationPtr reserveSpaceAtDisk(UInt64 expected_size) const; ///@TODO_IGR ASK Maybe set this method as private?
Strings getFullPaths() const {
auto paths = schema.getFullPaths();
for (auto && path : paths) {
path += table_name + '/'; ///@TODO_IGR ASK It is too slow(
/// Maybe store in full_paths variable?
}
return paths;
}
MergeTreeDataFormatVersion format_version;
Context global_context;
@ -633,7 +645,8 @@ private:
String database_name;
String table_name;
Strings full_paths;
Schema schema;
/// Current column sizes in compressed and uncompressed form.
ColumnSizeByName column_sizes;

View File

@ -119,14 +119,6 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
name = part_info.getPartName();
}
UInt64 FutureMergedMutatedPart::expectedSize() const { ///TODO_IGR ASK sum size?
size_t size = 0;
for (auto &part : parts) {
size += part->getFileSizeOrZero(""); ///@TODO_IGR ASK file name?
}
return size;
}
MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, const BackgroundProcessingPool & pool_)
: data(data_), pool(pool_), log(&Logger::get(data.getLogName() + " (MergerMutator)"))
{
@ -522,8 +514,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
<< parts.front()->name << " to " << parts.back()->name
<< " into " << TMP_PREFIX + future_part.name);
size_t expected_size = future_part.expectedSize();
String part_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK EXPECTED SIZE
String part_path = disk_reservation->getPath();
String new_part_tmp_path = part_path + TMP_PREFIX + future_part.name + "/";
if (Poco::File(new_part_tmp_path).exists())
@ -830,7 +821,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
const FutureMergedMutatedPart & future_part,
const std::vector<MutationCommand> & commands,
MergeListEntry & merge_entry,
const Context & context)
const Context & context,
DiskSpaceMonitor::Reservation * disk_reservation)
{
auto check_not_cancelled = [&]()
{
@ -865,8 +857,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
else
LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation);
size_t expected_size = future_part.expectedSize();
String part_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK EXPECTED_SIZE
String part_path = disk_reservation->getPath();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, part_path, future_part.name, future_part.part_info);

View File

@ -30,8 +30,6 @@ struct FutureMergedMutatedPart
}
void assign(MergeTreeData::DataPartsVector parts_);
UInt64 expectedSize() const;
};
/** Can select the parts to merge and merge them.
@ -98,7 +96,7 @@ public:
MergeTreeData::MutableDataPartPtr mutatePartToTemporaryPart(
const FutureMergedMutatedPart & future_part,
const std::vector<MutationCommand> & commands,
MergeListEntry & merge_entry, const Context & context);
MergeListEntry & merge_entry, const Context & context, DiskSpaceMonitor::Reservation * disk_reservation);
MergeTreeData::DataPartPtr renameMergedTemporaryPart(
MergeTreeData::MutableDataPartPtr & new_data_part,

View File

@ -65,7 +65,7 @@ StorageMergeTree::StorageMergeTree(
: path(path_), database_name(database_name_), table_name(table_name_), full_paths{path + escapeForFileName(table_name) + '/', "/mnt/data/Data2/" + escapeForFileName(table_name) + '/'},
global_context(context_), background_pool(context_.getBackgroundPool()),
data(database_name, table_name,
full_paths, columns_, indices_,
Schema(std::vector<Strings>{full_paths}), columns_, indices_, ///@TODO_IGR generate Schema from config
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, merging_params_, settings_, false, attach),
reader(data), writer(data), merger_mutator(data, global_context.getBackgroundPool()),
@ -273,12 +273,15 @@ public:
: future_part(future_part_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
reserved_space = DiskSpaceMonitor::reserve(storage.full_paths[0], total_size); /// May throw. @TODO_IGR ASK WHERE TO RESERVE
reserved_space = storage.data.reserveSpaceForPart(total_size);
if (!reserved_space) {
throw Exception("Not enought space", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR Edit exception msg
}
for (const auto & part : future_part.parts)
{
if (storage.currently_merging.count(part))
throw Exception("Tagging alreagy tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Tagging already tagged part " + part->name + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
}
storage.currently_merging.insert(future_part.parts.begin(), future_part.parts.end());
}
@ -481,7 +484,8 @@ bool StorageMergeTree::merge(
}
else
{
UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_paths[0]); ///@TODO_IGR ASK DISK OR DISKS
/// DataPArt can be store only at one disk. Get Max of free space at all disks
UInt64 disk_space = DiskSpaceMonitor::getMaxUnreservedFreeSpace(); ///@TODO_IGR ASK Maybe reserve max space at this disk there and then change to exactly space
selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
}
@ -570,7 +574,8 @@ bool StorageMergeTree::tryMutatePart()
/// You must call destructor with unlocked `currently_merging_mutex`.
std::optional<CurrentlyMergingPartsTagger> tagger;
{
auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_paths[0]); ///@TODO_IGR ASK DISK OR DISKS
/// DataPArt can be store only at one disk. Get Max of free space at all disks
UInt64 disk_space = DiskSpaceMonitor::getMaxUnreservedFreeSpace(); ///@TODO_IGR ASK Maybe reserve max space at this disk there and then change to exactly space
std::lock_guard lock(currently_merging_mutex);
@ -660,7 +665,8 @@ bool StorageMergeTree::tryMutatePart()
try
{
new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, *merge_entry, global_context);
new_part = merger_mutator.mutatePartToTemporaryPart(future_part, commands, *merge_entry, global_context,
tagger->reserved_space.get());
data.renameTempPartAndReplace(new_part);
tagger->is_successful = true;
write_part_log({});