Add Path chooser. Add DiskMonitort multiple disks support

This commit is contained in:
Igor Mineev 2019-04-01 21:41:19 +03:00
parent e7f87bb7cc
commit 8047d16a96
14 changed files with 239 additions and 154 deletions

View File

@ -5,15 +5,15 @@
namespace DB
{
ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names)
ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const ActiveDataPartSet::PartPathNames & names)
: format_version(format_version_)
{
for (const auto & name : names)
add(name);
for (const auto & path_name : names)
add(path_name.path, path_name.name);
}
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
bool ActiveDataPartSet::add(const String & path, const String & name, PartPathNames * out_replaced_parts)
{
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
@ -52,12 +52,12 @@ bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
part_info_to_name.erase(it++);
}
part_info_to_name.emplace(part_info, name);
part_info_to_name.emplace(part_info, PartPathName{path, name});
return true;
}
String ActiveDataPartSet::getContainingPart(const MergeTreePartInfo & part_info) const
ActiveDataPartSet::PartPathName ActiveDataPartSet::getContainingPart(const MergeTreePartInfo & part_info) const
{
auto it = getContainingPartImpl(part_info);
if (it != part_info_to_name.end())
@ -66,7 +66,7 @@ String ActiveDataPartSet::getContainingPart(const MergeTreePartInfo & part_info)
}
String ActiveDataPartSet::getContainingPart(const String & name) const
ActiveDataPartSet::PartPathName ActiveDataPartSet::getContainingPart(const String & name) const
{
auto it = getContainingPartImpl(MergeTreePartInfo::fromPartName(name, format_version));
if (it != part_info_to_name.end())
@ -75,7 +75,7 @@ String ActiveDataPartSet::getContainingPart(const String & name) const
}
std::map<MergeTreePartInfo, String>::const_iterator
std::map<MergeTreePartInfo, ActiveDataPartSet::PartPathName>::const_iterator
ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_info) const
{
/// A part can only be covered/overlapped by the previous or next one in `part_info_to_name`.
@ -97,7 +97,8 @@ ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_info) co
return part_info_to_name.end();
}
Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const
ActiveDataPartSet::PartPathNames
ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const
{
auto it_middle = part_info_to_name.lower_bound(part_info);
auto begin = it_middle;
@ -128,16 +129,16 @@ Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info
++end;
}
Strings covered;
PartPathNames covered;
for (auto it = begin; it != end; ++it)
covered.push_back(it->second);
return covered;
}
Strings ActiveDataPartSet::getParts() const
ActiveDataPartSet::PartPathNames ActiveDataPartSet::getParts() const
{
Strings res;
PartPathNames res;
res.reserve(part_info_to_name.size());
for (const auto & kv : part_info_to_name)
res.push_back(kv.second);

View File

@ -15,8 +15,15 @@ namespace DB
class ActiveDataPartSet
{
public:
struct PartPathName {
String path;
String name;
};
using PartPathNames = std::vector<PartPathName>;
ActiveDataPartSet(MergeTreeDataFormatVersion format_version_) : format_version(format_version_) {}
ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names);
ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const PartPathNames & names);
ActiveDataPartSet(const ActiveDataPartSet & other)
: format_version(other.format_version)
@ -43,7 +50,7 @@ public:
/// Returns true if the part was actually added. If out_replaced_parts != nullptr, it will contain
/// parts that were replaced from the set by the newly added part.
bool add(const String & name, Strings * out_replaced_parts = nullptr);
bool add(const String & path, const String & name, PartPathNames * out_replaced_parts = nullptr);
bool remove(const MergeTreePartInfo & part_info)
{
@ -56,13 +63,13 @@ public:
}
/// If not found, return an empty string.
String getContainingPart(const MergeTreePartInfo & part_info) const;
String getContainingPart(const String & name) const;
PartPathName getContainingPart(const MergeTreePartInfo & part_info) const;
PartPathName getContainingPart(const String & name) const;
Strings getPartsCoveredBy(const MergeTreePartInfo & part_info) const;
PartPathNames getPartsCoveredBy(const MergeTreePartInfo & part_info) const;
/// Returns parts in ascending order of the partition_id and block number.
Strings getParts() const;
PartPathNames getParts() const;
size_t size() const;
@ -70,9 +77,9 @@ public:
private:
MergeTreeDataFormatVersion format_version;
std::map<MergeTreePartInfo, String> part_info_to_name;
std::map<MergeTreePartInfo, PartPathName> part_info_to_name;
std::map<MergeTreePartInfo, String>::const_iterator getContainingPartImpl(const MergeTreePartInfo & part_info) const;
std::map<MergeTreePartInfo, PartPathName>::const_iterator getContainingPartImpl(const MergeTreePartInfo & part_info) const;
};
}

View File

@ -100,7 +100,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
{
String file_name = it.first;
String path = data.getFullPath() + part_name + "/" + file_name;
String path = part->getFullPath() + part_name + "/" + file_name;
UInt64 size = Poco::File(path).getSize();
@ -200,7 +200,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
String tmp_prefix = tmp_prefix_.empty() ? TMP_PREFIX : tmp_prefix_;
String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
String absolute_part_path = data.getFullPath() + relative_part_path + "/";
String part_path = data.getFullPathForPart(0);
String absolute_part_path = part_path + relative_part_path + "/"; ///@TODO_IGR ASK path for file
Poco::File part_file(absolute_part_path);
if (part_file.exists())
@ -210,7 +211,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
part_file.createDirectory();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, part_name);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, part_path, part_name);
new_data_part->relative_path = relative_part_path;
new_data_part->is_temp = true;

View File

@ -3,8 +3,7 @@
namespace DB
{
UInt64 DiskSpaceMonitor::reserved_bytes;
UInt64 DiskSpaceMonitor::reservation_count;
std::map<String, DiskSpaceMonitor::DiskReserve> DiskSpaceMonitor::reserved;
std::mutex DiskSpaceMonitor::mutex;
}

View File

@ -34,6 +34,11 @@ namespace ErrorCodes
class DiskSpaceMonitor
{
public:
struct DiskReserve {
UInt64 reserved_bytes;
UInt64 reservation_count;
};
class Reservation : private boost::noncopyable
{
public:
@ -42,23 +47,23 @@ public:
try
{
std::lock_guard lock(DiskSpaceMonitor::mutex);
if (DiskSpaceMonitor::reserved_bytes < size)
if (reserves->reserved_bytes < size)
{
DiskSpaceMonitor::reserved_bytes = 0;
reserves->reserved_bytes = 0;
LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservations size; it's a bug");
}
else
{
DiskSpaceMonitor::reserved_bytes -= size;
reserves->reserved_bytes -= size;
}
if (DiskSpaceMonitor::reservation_count == 0)
if (reserves->reservation_count == 0)
{
LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservation count; it's a bug");
}
else
{
--DiskSpaceMonitor::reservation_count;
--reserves->reservation_count;
}
}
catch (...)
@ -71,9 +76,9 @@ public:
void update(UInt64 new_size)
{
std::lock_guard lock(DiskSpaceMonitor::mutex);
DiskSpaceMonitor::reserved_bytes -= size;
reserves->reserved_bytes -= size;
size = new_size;
DiskSpaceMonitor::reserved_bytes += size;
reserves->reserved_bytes += size;
}
UInt64 getSize() const
@ -81,17 +86,18 @@ public:
return size;
}
Reservation(UInt64 size_)
: size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size)
Reservation(UInt64 size_, DiskReserve * reserves_)
: size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size), reserves(reserves_)
{
std::lock_guard lock(DiskSpaceMonitor::mutex);
DiskSpaceMonitor::reserved_bytes += size;
++DiskSpaceMonitor::reservation_count;
reserves->reserved_bytes += size;
++reserves->reservation_count;
}
private:
UInt64 size;
CurrentMetrics::Increment metric_increment;
DiskReserve * reserves;
};
using ReservationPtr = std::unique_ptr<Reservation>;
@ -110,6 +116,8 @@ public:
std::lock_guard lock(mutex);
auto & reserved_bytes = reserved[path].reserved_bytes;
if (reserved_bytes > res)
res = 0;
else
@ -118,16 +126,16 @@ public:
return res;
}
static UInt64 getReservedSpace()
static UInt64 getReservedSpace(const std::string & path)
{
std::lock_guard lock(mutex);
return reserved_bytes;
return reserved[path].reserved_bytes;
}
static UInt64 getReservationCount()
static UInt64 getReservationCount(const std::string & path)
{
std::lock_guard lock(mutex);
return reservation_count;
return reserved[path].reservation_count;
}
/// If not enough (approximately) space, throw an exception.
@ -137,12 +145,11 @@ public:
if (free_bytes < size)
throw Exception("Not enough free disk space to reserve: " + formatReadableSizeWithBinarySuffix(free_bytes) + " available, "
+ formatReadableSizeWithBinarySuffix(size) + " requested", ErrorCodes::NOT_ENOUGH_SPACE);
return std::make_unique<Reservation>(size);
return std::make_unique<Reservation>(size, &reserved[path]);
}
private:
static UInt64 reserved_bytes;
static UInt64 reservation_count;
static std::map<String, DiskReserve> reserved;
static std::mutex mutex;
};

View File

@ -89,7 +89,7 @@ namespace ErrorCodes
MergeTreeData::MergeTreeData(
const String & database_, const String & table_,
const String & full_path_, const ColumnsDescription & columns_,
const Strings & full_paths_, const ColumnsDescription & columns_,
const IndicesDescription & indices_,
Context & context_,
const String & date_column_name,
@ -110,7 +110,7 @@ MergeTreeData::MergeTreeData(
sample_by_ast(sample_by_ast_),
require_part_metadata(require_part_metadata_),
database_name(database_), table_name(table_),
full_path(full_path_),
full_paths(full_paths_),
broken_part_callback(broken_part_callback_),
log_name(database_name + "." + table_name), log(&Logger::get(log_name + " (Data)")),
data_parts_by_info(data_parts_indexes.get<TagByInfo>()),
@ -159,13 +159,17 @@ MergeTreeData::MergeTreeData(
min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING;
}
auto path_exists = Poco::File(full_path).exists();
auto format_path = full_paths[0]; ///@TODO_IGR ASK What path should we use for format file?
auto path_exists = Poco::File(format_path).exists();
for (const String & path : full_paths) {
/// Creating directories, if not exist.
Poco::File(full_path).createDirectories();
Poco::File(path).createDirectories();
Poco::File(full_path + "detached").createDirectory();
Poco::File(path + "detached").createDirectory();
}
String version_file_path = full_path + "format_version.txt";
String version_file_path = format_path + "format_version.txt";
auto version_file_exists = Poco::File(version_file_path).exists();
// When data path or file not exists, ignore the format_version check
if (!attach || !path_exists || !version_file_exists)
@ -625,15 +629,19 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
{
LOG_DEBUG(log, "Loading data parts");
Strings part_file_names;
std::vector<std::pair<String, size_t>> part_file_names;
Poco::DirectoryIterator end;
for (size_t i = 0; i != full_paths.size(); ++i)
{
auto&& full_path = full_paths[i];
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
/// Skip temporary directories.
if (startsWith(it.name(), "tmp"))
continue;
part_file_names.push_back(it.name());
part_file_names.emplace_back(it.name(), i);
}
}
DataPartsVector broken_parts_to_remove;
@ -643,13 +651,15 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
std::lock_guard lock(data_parts_mutex);
data_parts_indexes.clear();
for (const String & file_name : part_file_names)
for (const auto & part_file_name : part_file_names)
{
const String & file_name = part_file_name.first;
const size_t path_index = part_file_name.second;
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(file_name, &part_info, format_version))
continue;
MutableDataPartPtr part = std::make_shared<DataPart>(*this, file_name, part_info);
MutableDataPartPtr part = std::make_shared<DataPart>(*this, full_paths[path_index], file_name, part_info);
part->relative_path = file_name;
bool broken = false;
@ -683,7 +693,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (part->info.level == 0)
{
/// It is impossible to restore level 0 parts.
LOG_ERROR(log, "Considering to remove broken part " << full_path + file_name << " because it's impossible to repair.");
LOG_ERROR(log, "Considering to remove broken part " << full_paths[path_index] + file_name << " because it's impossible to repair.");
broken_parts_to_remove.push_back(part);
}
else
@ -693,10 +703,12 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// delete it.
size_t contained_parts = 0;
LOG_ERROR(log, "Part " << full_path + file_name << " is broken. Looking for parts to replace it.");
LOG_ERROR(log, "Part " << full_paths[path_index] + file_name << " is broken. Looking for parts to replace it.");
for (const String & contained_name : part_file_names)
for (auto part_file_name : part_file_names)
{
const String & contained_name = part_file_name.first;
const size_t contained_path_index = part_file_name.second;
if (contained_name == file_name)
continue;
@ -706,19 +718,19 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (part->info.contains(contained_part_info))
{
LOG_ERROR(log, "Found part " << full_path + contained_name);
LOG_ERROR(log, "Found part " << full_paths[contained_path_index] + contained_name);
++contained_parts;
}
}
if (contained_parts >= 2)
{
LOG_ERROR(log, "Considering to remove broken part " << full_path + file_name << " because it covers at least 2 other parts");
LOG_ERROR(log, "Considering to remove broken part " << full_paths[path_index] + file_name << " because it covers at least 2 other parts");
broken_parts_to_remove.push_back(part);
}
else
{
LOG_ERROR(log, "Detaching broken part " << full_path + file_name
LOG_ERROR(log, "Detaching broken part " << full_paths[path_index] + file_name
<< " because it covers less than 2 parts. You need to resolve this manually");
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
@ -728,7 +740,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
continue;
}
part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();
part->modification_time = Poco::File(full_paths[path_index] + file_name).getLastModified().epochTime();
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->state = DataPartState::Committed;
@ -830,26 +842,26 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life
/// Delete temporary directories older than a day.
Poco::DirectoryIterator end;
for (auto && full_path : full_paths)
{
for (Poco::DirectoryIterator it{full_path}; it != end; ++it)
{
if (startsWith(it.name(), "tmp_"))
{
Poco::File tmp_dir(full_path + it.name());
try
{
if (tmp_dir.isDirectory() && isOldPartDirectory(tmp_dir, deadline))
{
try {
if (tmp_dir.isDirectory() && isOldPartDirectory(tmp_dir, deadline)) {
LOG_WARNING(log, "Removing temporary directory " << full_path << it.name());
Poco::File(full_path + it.name()).remove(true);
}
}
catch (const Poco::FileNotFoundException &)
{
catch (const Poco::FileNotFoundException &) {
/// If the file is already deleted, do nothing.
}
}
}
}
}
@ -965,15 +977,18 @@ void MergeTreeData::clearOldPartsFromFilesystem()
removePartsFinally(parts_to_remove);
}
void MergeTreeData::setPath(const String & new_full_path)
void MergeTreeData::setPath([[maybe_unused]] const String & new_full_path)
{
if (Poco::File{new_full_path}.exists())
throw Exception{"Target path already exists: " + new_full_path, ErrorCodes::DIRECTORY_ALREADY_EXISTS};
///@TODO_IGR ASK We can not implement this function. Remove it?
throw Exception{"this funcion does not implemeted yes", ErrorCodes::BAD_ARGUMENTS};
Poco::File(full_path).renameTo(new_full_path);
global_context.dropCaches();
full_path = new_full_path;
// if (Poco::File{new_full_path}.exists())
// throw Exception{"Target path already exists: " + new_full_path, ErrorCodes::DIRECTORY_ALREADY_EXISTS};
//
// Poco::File(full_path).renameTo(new_full_path);
//
// global_context.dropCaches();
// full_path = new_full_path;
}
void MergeTreeData::dropAllData()
@ -991,7 +1006,9 @@ void MergeTreeData::dropAllData()
LOG_TRACE(log, "dropAllData: removing data from filesystem.");
for (auto && full_path : full_paths) {
Poco::File(full_path).remove(true);
}
LOG_TRACE(log, "dropAllData: done.");
}
@ -1366,7 +1383,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
exception_message
<< ") need to be "
<< (forbidden_because_of_modify ? "modified" : "removed")
<< " in part " << part->name << " of table at " << full_path << ". Aborting just in case."
<< " in part " << part->name << " of table at " << part->path << ". Aborting just in case."
<< " If it is not an error, you could increase merge_tree/"
<< (forbidden_because_of_modify ? "max_files_to_modify_in_alter_columns" : "max_files_to_remove_in_alter_columns")
<< " parameter in configuration file (current value: "
@ -1403,8 +1420,10 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
* will have old name of shared offsets for arrays.
*/
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
///@TODO_IGR ASK Why dont we use part->relative_path?
MergedColumnOnlyOutputStream out(
*this, in.getHeader(), full_path + part->name + '/', true /* sync */, compression_codec, true /* skip_offsets */, unused_written_offsets);
*this, in.getHeader(), part->path + part->name + '/', true /* sync */, compression_codec, true /* skip_offsets */, unused_written_offsets);
in.readPrefix();
out.writePrefix();
@ -1430,7 +1449,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
if (!part->checksums.empty())
{
transaction->new_checksums = new_checksums;
WriteBufferFromFile checksums_file(full_path + part->name + "/checksums.txt.tmp", 4096);
WriteBufferFromFile checksums_file(part->path + part->name + "/checksums.txt.tmp", 4096);
new_checksums.write(checksums_file);
transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
}
@ -1438,7 +1457,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
/// Write the new column list to the temporary file.
{
transaction->new_columns = new_columns.filter(part->columns.getNames());
WriteBufferFromFile columns_file(full_path + part->name + "/columns.txt.tmp", 4096);
WriteBufferFromFile columns_file(part->path + part->name + "/columns.txt.tmp", 4096);
transaction->new_columns.writeText(columns_file);
transaction->rename_map["columns.txt.tmp"] = "columns.txt";
}
@ -1459,7 +1478,7 @@ void MergeTreeData::AlterDataPartTransaction::commit()
{
std::unique_lock<std::shared_mutex> lock(data_part->columns_lock);
String path = data_part->storage.full_path + data_part->name + "/";
String path = data_part->path + data_part->name + "/";
/// NOTE: checking that a file exists before renaming or deleting it
/// is justified by the fact that, when converting an ordinary column
@ -2133,9 +2152,9 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
}
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & relative_path)
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & path, const String & relative_path)
{
MutableDataPartPtr part = std::make_shared<DataPart>(*this, Poco::Path(relative_path).getFileName());
MutableDataPartPtr part = std::make_shared<DataPart>(*this, path, Poco::Path(relative_path).getFileName());
part->relative_path = relative_path;
String full_part_path = part->getFullPath();
@ -2254,6 +2273,8 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_id) const
Poco::DirectoryIterator end;
for (const String & full_path : full_paths)
{
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
MergeTreePartInfo part_info;
@ -2263,12 +2284,12 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_id) const
continue;
const auto part_path = it.path().absolute().toString();
for (Poco::DirectoryIterator it2(part_path); it2 != end; ++it2)
{
for (Poco::DirectoryIterator it2(part_path); it2 != end; ++it2) {
const auto part_file_path = it2.path().absolute().toString();
size += Poco::File(part_file_path).getSize();
}
}
}
return size;
}
@ -2392,6 +2413,27 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat
return res;
}
String MergeTreeData::getFullPathForPart(UInt64 expected_size) const
{
std::cerr << "Exp size " << expected_size << std::endl;
constexpr UInt64 SIZE_100MB = 100ull << 20;
constexpr UInt64 MAGIC_CONST = 1;
if (expected_size < SIZE_100MB) {
expected_size = SIZE_100MB;
}
for (const String & path : full_paths) {
UInt64 free_space = DiskSpaceMonitor::getUnreservedFreeSpace(path); ///@TODO_IGR ASK reserve? YES, we are
if (free_space > expected_size * MAGIC_CONST) {
std::cerr << "Choosed " << free_space << " " << path << std::endl;
return path;
}
}
std::cerr << "Choosed last " << full_paths[full_paths.size() - 1] << std::endl;
return full_paths[full_paths.size() - 1];
}
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
{
DataParts res;
@ -2577,7 +2619,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
Poco::Path dst_part_absolute_path = Poco::Path(full_path + tmp_dst_part_name).absolute();
///@TODO_IGR ASK Maybe flag that it is not recent part? Or choose same dir if it is possible
Poco::Path dst_part_absolute_path = Poco::Path(getFullPathForPart(src_part->bytes_on_disk) + tmp_dst_part_name).absolute();
Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute();
if (Poco::File(dst_part_absolute_path).exists())
@ -2586,7 +2629,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString());
localBackup(src_part_absolute_path, dst_part_absolute_path);
MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared<MergeTreeData::DataPart>(*this, dst_part_name, dst_part_info);
MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared<MergeTreeData::DataPart>(*this, dst_part_storage_path, dst_part_name, dst_part_info);
dst_data_part->relative_path = tmp_dst_part_name;
dst_data_part->is_temp = true;

View File

@ -303,7 +303,7 @@ public:
/// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
/// attach - whether the existing table is attached or the new table is created.
MergeTreeData(const String & database_, const String & table_,
const String & full_path_,
const Strings & full_paths_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
Context & context_,
@ -363,7 +363,7 @@ public:
String getTableName() const { return table_name; }
String getFullPath() const { return full_path; }
String getFullPathForPart(UInt64 expected_size) const;
String getLogName() const { return log_name; }
@ -525,7 +525,7 @@ public:
Names getColumnsRequiredForSampling() const { return columns_required_for_sampling; }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const String & relative_path);
MutableDataPartPtr loadPartAndFixMetadata(const String & path, const String & relative_path);
/** Create local backup (snapshot) for parts with specified prefix.
* Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,
@ -633,7 +633,7 @@ private:
String database_name;
String table_name;
String full_path;
Strings full_paths;
/// Current column sizes in compressed and uncompressed form.
ColumnSizeByName column_sizes;

View File

@ -119,6 +119,14 @@ void FutureMergedMutatedPart::assign(MergeTreeData::DataPartsVector parts_)
name = part_info.getPartName();
}
UInt64 FutureMergedMutatedPart::expectedSize() const { ///TODO_IGR ASK sum size?
size_t size = 0;
for (auto &part : parts) {
size += part->getFileSizeOrZero(""); ///@TODO_IGR ASK file name?
}
return size;
}
MergeTreeDataMergerMutator::MergeTreeDataMergerMutator(MergeTreeData & data_, const BackgroundProcessingPool & pool_)
: data(data_), pool(pool_), log(&Logger::get(data.getLogName() + " (MergerMutator)"))
{
@ -150,7 +158,8 @@ UInt64 MergeTreeDataMergerMutator::getMaxSourcePartsSize(size_t pool_size, size_
data.settings.max_bytes_to_merge_at_max_space_in_pool,
static_cast<double>(free_entries) / data.settings.number_of_free_entries_in_pool_to_lower_max_size_of_merge);
return std::min(max_size, static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_path) / DISK_USAGE_COEFFICIENT_TO_SELECT));
///@TODO_IGR ASK what path?
return std::min(max_size, static_cast<UInt64>(DiskSpaceMonitor::getUnreservedFreeSpace(data.full_paths[0]) / DISK_USAGE_COEFFICIENT_TO_SELECT));
}
@ -290,8 +299,8 @@ bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
LOG_WARNING(log, "Won't merge parts from " << parts.front()->name << " to " << (*prev_it)->name
<< " because not enough free space: "
<< formatReadableSizeWithBinarySuffix(available_disk_space) << " free and unreserved "
<< "(" << formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getReservedSpace()) << " reserved in "
<< DiskSpaceMonitor::getReservationCount() << " chunks), "
<< "(" << formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getReservedSpace("")) << " reserved in " ///@TODO_IGR ASK RESERVED SPACE ON ALL DISKS?
<< DiskSpaceMonitor::getReservationCount("") << " chunks), "
<< formatReadableSizeWithBinarySuffix(sum_bytes)
<< " required now (+" << static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100)
<< "% on overhead); suppressing similar warnings for the next hour");
@ -513,7 +522,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
<< parts.front()->name << " to " << parts.back()->name
<< " into " << TMP_PREFIX + future_part.name);
String new_part_tmp_path = data.getFullPath() + TMP_PREFIX + future_part.name + "/";
size_t expected_size = future_part.expectedSize();
String part_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK EXPECTED SIZE
String new_part_tmp_path = part_path + TMP_PREFIX + future_part.name + "/";
if (Poco::File(new_part_tmp_path).exists())
throw Exception("Directory " + new_part_tmp_path + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
@ -531,7 +543,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, future_part.name, future_part.part_info);
data, part_path, future_part.name, future_part.part_info);
new_data_part->partition.assign(future_part.getPartition());
new_data_part->relative_path = TMP_PREFIX + future_part.name;
new_data_part->is_temp = true;
@ -853,8 +865,11 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
else
LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation);
size_t expected_size = future_part.expectedSize();
String part_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK EXPECTED_SIZE
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, future_part.name, future_part.part_info);
data, part_path, future_part.name, future_part.part_info);
new_data_part->relative_path = "tmp_mut_" + future_part.name;
new_data_part->is_temp = true;

View File

@ -30,6 +30,8 @@ struct FutureMergedMutatedPart
}
void assign(MergeTreeData::DataPartsVector parts_);
UInt64 expectedSize() const;
};
/** Can select the parts to merge and merge them.

View File

@ -136,8 +136,9 @@ void MergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
}
MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String & name_)
: storage(storage_), name(name_), info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String& path_, const String & name_)
///@TODO_IGR DO check is fromPartName need to use path
: storage(storage_), path(path_), name(name_), info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
{
}
@ -233,7 +234,7 @@ String MergeTreeDataPart::getFullPath() const
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. This is bug.", ErrorCodes::LOGICAL_ERROR);
return storage.full_path + relative_path + "/";
return path + relative_path + "/";
}
String MergeTreeDataPart::getNameWithPrefix() const
@ -355,8 +356,8 @@ void MergeTreeDataPart::remove() const
* And a race condition can happen that will lead to "File not found" error here.
*/
String from = storage.full_path + relative_path;
String to = storage.full_path + "delete_tmp_" + name;
String from = path + relative_path;
String to = path + "delete_tmp_" + name;
Poco::File from_dir{from};
Poco::File to_dir{to};
@ -396,7 +397,7 @@ void MergeTreeDataPart::remove() const
void MergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const
{
String from = getFullPath();
String to = storage.full_path + new_relative_path + "/";
String to = path + new_relative_path + "/";
Poco::File from_file(from);
if (!from_file.exists())
@ -442,7 +443,7 @@ String MergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix)
{
res = dst_name();
if (!Poco::File(storage.full_path + res).exists())
if (!Poco::File(path + res).exists())
return res;
LOG_WARNING(storage.log, "Directory " << dst_name() << " (to detach to) is already exist."
@ -462,7 +463,8 @@ void MergeTreeDataPart::renameToDetached(const String & prefix) const
void MergeTreeDataPart::makeCloneInDetached(const String & prefix) const
{
Poco::Path src(getFullPath());
Poco::Path dst(storage.full_path + getRelativePathForDetachedPart(prefix));
Poco::Path dst(path + getRelativePathForDetachedPart(prefix));
///@TODO_IGR ASK What about another path?
/// Backup is not recursive (max_level is 0), so do not copy inner directories
localBackup(src, dst, 0);
}

View File

@ -28,12 +28,12 @@ struct MergeTreeDataPart
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;
MergeTreeDataPart(const MergeTreeData & storage_, const String & name_, const MergeTreePartInfo & info_)
: storage(storage_), name(name_), info(info_)
MergeTreeDataPart(const MergeTreeData & storage_, const String & path_, const String & name_, const MergeTreePartInfo & info_)
: storage(storage_), path(path_), name(name_), info(info_)
{
}
MergeTreeDataPart(MergeTreeData & storage_, const String & name_);
MergeTreeDataPart(MergeTreeData & storage_, const String & path_, const String & name_);
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
@ -86,6 +86,7 @@ struct MergeTreeDataPart
const MergeTreeData & storage;
String path;
String name;
MergeTreePartInfo info;

View File

@ -161,7 +161,10 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
else
part_name = new_part_info.getPartName();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, part_name, new_part_info);
size_t expected_size = block.bytes();
String part_absolute_path = data.getFullPathForPart(expected_size); ///@TODO_IGR ASK expected size
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, part_absolute_path, part_name, new_part_info);
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->relative_path = TMP_PREFIX + part_name;

View File

@ -62,10 +62,10 @@ StorageMergeTree::StorageMergeTree(
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
: path(path_), database_name(database_name_), table_name(table_name_), full_path(path + escapeForFileName(table_name) + '/'),
: path(path_), database_name(database_name_), table_name(table_name_), full_paths{path + escapeForFileName(table_name) + '/', "/mnt/data/Data2/" + escapeForFileName(table_name) + '/'},
global_context(context_), background_pool(context_.getBackgroundPool()),
data(database_name, table_name,
full_path, columns_, indices_,
full_paths, columns_, indices_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, merging_params_, settings_, false, attach),
reader(data), writer(data), merger_mutator(data, global_context.getBackgroundPool()),
@ -185,7 +185,7 @@ void StorageMergeTree::rename(const String & new_path_to_db, const String & /*ne
path = new_path_to_db;
table_name = new_table_name;
full_path = new_full_path;
full_paths = {new_full_path}; ///TODO_IGR ASK rename?
/// NOTE: Logger names are not updated.
}
@ -273,7 +273,7 @@ public:
: future_part(future_part_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
reserved_space = DiskSpaceMonitor::reserve(storage.full_path, total_size); /// May throw.
reserved_space = DiskSpaceMonitor::reserve(storage.full_paths[0], total_size); /// May throw. @TODO_IGR ASK WHERE TO RESERVE
for (const auto & part : future_part.parts)
{
@ -333,7 +333,7 @@ public:
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
{
MergeTreeMutationEntry entry(commands, full_path, data.insert_increment.get());
MergeTreeMutationEntry entry(commands, full_paths[0], data.insert_increment.get()); ///@TODO_IGR ASK PATH TO ENTRY
String file_name;
{
std::lock_guard lock(currently_merging_mutex);
@ -426,11 +426,11 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
void StorageMergeTree::loadMutations()
{
Poco::DirectoryIterator end;
for (auto it = Poco::DirectoryIterator(full_path); it != end; ++it)
for (auto it = Poco::DirectoryIterator(full_paths[0]); it != end; ++it) ///@TODO_IGR ASK MUTATIONS FROM ALL DISKS?
{
if (startsWith(it.name(), "mutation_"))
{
MergeTreeMutationEntry entry(full_path, it.name());
MergeTreeMutationEntry entry(full_paths[0], it.name());
Int64 block_number = entry.block_number;
auto insertion = current_mutations_by_id.emplace(it.name(), std::move(entry));
current_mutations_by_version.emplace(block_number, insertion.first->second);
@ -481,7 +481,7 @@ bool StorageMergeTree::merge(
}
else
{
UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
UInt64 disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_paths[0]); ///@TODO_IGR ASK DISK OR DISKS
selected = merger_mutator.selectAllPartsToMergeWithinPartition(future_part, disk_space, can_merge, partition_id, final, out_disable_reason);
}
@ -570,7 +570,7 @@ bool StorageMergeTree::tryMutatePart()
/// You must call destructor with unlocked `currently_merging_mutex`.
std::optional<CurrentlyMergingPartsTagger> tagger;
{
auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_path);
auto disk_space = DiskSpaceMonitor::getUnreservedFreeSpace(full_paths[0]); ///@TODO_IGR ASK DISK OR DISKS
std::lock_guard lock(currently_merging_mutex);
@ -948,16 +948,19 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
String source_dir = "detached/";
/// Let's make a list of parts to add.
Strings parts;
ActiveDataPartSet::PartPathNames parts;
if (attach_part)
{
parts.push_back(partition_id);
for (const String & full_path : full_paths) {
parts.push_back(ActiveDataPartSet::PartPathName{full_path, partition_id}); ///@TODO_IGR ASK
}
}
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
ActiveDataPartSet active_parts(data.format_version);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
for (const String & full_path : full_paths) {
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it) ///@TODO_IGR
{
const String & name = it.name();
MergeTreePartInfo part_info;
@ -967,20 +970,21 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
continue;
}
LOG_DEBUG(log, "Found part " << name);
active_parts.add(name);
active_parts.add(full_path, name);
}
}
LOG_DEBUG(log, active_parts.size() << " of them are active");
parts = active_parts.getParts();
}
for (const auto & source_part_name : parts)
for (const auto & source_part : parts)
{
String source_path = source_dir + source_part_name;
String source_path = source_dir + source_part.name;
LOG_DEBUG(log, "Checking data");
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_path);
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_part.path, source_part.name);
LOG_INFO(log, "Attaching part " << source_part_name << " from " << source_path);
LOG_INFO(log, "Attaching part " << source_part.name << " from " << source_path);
data.renameTempPartAndAdd(part, &increment);
LOG_INFO(log, "Finished attaching part");

View File

@ -90,7 +90,7 @@ public:
MergeTreeData & getData() { return data; }
const MergeTreeData & getData() const { return data; }
String getDataPath() const override { return full_path; }
String getDataPath() const override { return full_paths[0]; } ///@TODO_IGR ASK WHAT PATH
ASTPtr getPartitionKeyAST() const override { return data.partition_by_ast; }
ASTPtr getSortingKeyAST() const override { return data.getSortingKeyAST(); }
@ -107,7 +107,7 @@ private:
String path;
String database_name;
String table_name;
String full_path;
Strings full_paths;
Context global_context;
BackgroundProcessingPool & background_pool;