New data path: disk_path + db_name + table_name. Removed default path from MergeTree Constructor

This commit is contained in:
Igor Mineev 2019-04-21 21:38:44 +03:00
parent d05c23bd1d
commit 79abe85328
20 changed files with 379 additions and 381 deletions

View File

@ -422,6 +422,8 @@ namespace ErrorCodes
extern const int CANNOT_MPROTECT = 445;
extern const int FUNCTION_NOT_ALLOWED = 446;
extern const int HYPERSCAN_CANNOT_SCAN_TEXT = 447;
extern const int UNKNOWN_SCHEMA = 448;
extern const int UNKNOWN_DISK = 449;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -146,6 +146,8 @@ struct ContextShared
/// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector;
/// Storage schema chooser;
mutable std::unique_ptr<DiskSelector> merge_tree_disk_selector;
/// Storage schema chooser;
mutable std::unique_ptr<SchemaSelector> merge_tree_schema_selector;
std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
@ -1651,18 +1653,42 @@ CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double par
}
const Schema& Context::getSchema(const String & name) const
const DiskPtr & Context::getDisk(const String & name) const
{
auto lock = getLock();
const auto & disk_selector = getDiskSelector();
return disk_selector[name];
}
DiskSelector & Context::getDiskSelector() const
{
auto lock = getLock();
if (!shared->merge_tree_disk_selector)
{
constexpr auto config_name = "storage_configuration.disks";
auto & config = getConfigRef();
shared->merge_tree_disk_selector = std::make_unique<DiskSelector>(config, config_name, getPath());
}
return *shared->merge_tree_disk_selector;
}
const Schema & Context::getSchema(const String & name) const
{
auto lock = getLock();
if (!shared->merge_tree_schema_selector)
{
constexpr auto config_name = "storage_configuration";
constexpr auto config_name = "storage_configuration.schemes";
auto & config = getConfigRef();
shared->merge_tree_schema_selector = std::make_unique<SchemaSelector>(config, config_name);
shared->merge_tree_schema_selector = std::make_unique<SchemaSelector>(config, config_name, getDiskSelector());
}
return (*shared->merge_tree_schema_selector)[name];
}

View File

@ -427,8 +427,13 @@ public:
/// Lets you select the compression codec according to the conditions described in the configuration file.
std::shared_ptr<ICompressionCodec> chooseCompressionCodec(size_t part_size, double part_size_ratio) const;
DiskSelector & getDiskSelector() const;
/// Provides storage disks
const DiskPtr & getDisk(const String & name) const;
/// Provides storage politics schemes
const Schema& getSchema(const String & name) const;
const Schema & getSchema(const String & name) const;
/// Get the server uptime in seconds.
time_t getUptimeSeconds() const;

View File

@ -5,15 +5,15 @@
namespace DB
{
ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const ActiveDataPartSet::PartPathNames & names)
ActiveDataPartSet::ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names)
: format_version(format_version_)
{
for (const auto & path_name : names)
add(path_name.path, path_name.name);
for (const auto & name : names)
add(name);
}
bool ActiveDataPartSet::add(const String & path, const String & name, PartPathNames * out_replaced_parts)
bool ActiveDataPartSet::add(const String & name, Strings * out_replaced_parts)
{
auto part_info = MergeTreePartInfo::fromPartName(name, format_version);
@ -52,12 +52,12 @@ bool ActiveDataPartSet::add(const String & path, const String & name, PartPathNa
part_info_to_name.erase(it++);
}
part_info_to_name.emplace(part_info, PartPathName{path, name});
part_info_to_name.emplace(part_info, name);
return true;
}
ActiveDataPartSet::PartPathName ActiveDataPartSet::getContainingPart(const MergeTreePartInfo & part_info) const
String ActiveDataPartSet::getContainingPart(const MergeTreePartInfo & part_info) const
{
auto it = getContainingPartImpl(part_info);
if (it != part_info_to_name.end())
@ -66,7 +66,7 @@ ActiveDataPartSet::PartPathName ActiveDataPartSet::getContainingPart(const Merge
}
ActiveDataPartSet::PartPathName ActiveDataPartSet::getContainingPart(const String & name) const
String ActiveDataPartSet::getContainingPart(const String & name) const
{
auto it = getContainingPartImpl(MergeTreePartInfo::fromPartName(name, format_version));
if (it != part_info_to_name.end())
@ -75,7 +75,7 @@ ActiveDataPartSet::PartPathName ActiveDataPartSet::getContainingPart(const Strin
}
std::map<MergeTreePartInfo, ActiveDataPartSet::PartPathName>::const_iterator
std::map<MergeTreePartInfo, String>::const_iterator
ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_info) const
{
/// A part can only be covered/overlapped by the previous or next one in `part_info_to_name`.
@ -97,8 +97,7 @@ ActiveDataPartSet::getContainingPartImpl(const MergeTreePartInfo & part_info) co
return part_info_to_name.end();
}
ActiveDataPartSet::PartPathNames
ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const
Strings ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const
{
auto it_middle = part_info_to_name.lower_bound(part_info);
auto begin = it_middle;
@ -129,16 +128,16 @@ ActiveDataPartSet::getPartsCoveredBy(const MergeTreePartInfo & part_info) const
++end;
}
PartPathNames covered;
Strings covered;
for (auto it = begin; it != end; ++it)
covered.push_back(it->second);
return covered;
}
ActiveDataPartSet::PartPathNames ActiveDataPartSet::getParts() const
Strings ActiveDataPartSet::getParts() const
{
PartPathNames res;
Strings res;
res.reserve(part_info_to_name.size());
for (const auto & kv : part_info_to_name)
res.push_back(kv.second);

View File

@ -15,17 +15,8 @@ namespace DB
class ActiveDataPartSet
{
public:
struct PartPathName
{
/// path + name is absolute path to DataPart
String path;
String name;
};
using PartPathNames = std::vector<PartPathName>;
ActiveDataPartSet(MergeTreeDataFormatVersion format_version_) : format_version(format_version_) {}
ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const PartPathNames & names);
ActiveDataPartSet(MergeTreeDataFormatVersion format_version_, const Strings & names);
ActiveDataPartSet(const ActiveDataPartSet & other)
: format_version(other.format_version)
@ -52,7 +43,7 @@ public:
/// Returns true if the part was actually added. If out_replaced_parts != nullptr, it will contain
/// parts that were replaced from the set by the newly added part.
bool add(const String & path, const String & name, PartPathNames * out_replaced_parts = nullptr);
bool add(const String & name, Strings * out_replaced_parts = nullptr);
bool remove(const MergeTreePartInfo & part_info)
{
@ -65,13 +56,13 @@ public:
}
/// If not found, return an empty string.
PartPathName getContainingPart(const MergeTreePartInfo & part_info) const;
PartPathName getContainingPart(const String & name) const;
String getContainingPart(const MergeTreePartInfo & part_info) const;
String getContainingPart(const String & name) const;
PartPathNames getPartsCoveredBy(const MergeTreePartInfo & part_info) const;
Strings getPartsCoveredBy(const MergeTreePartInfo & part_info) const;
/// Returns parts in ascending order of the partition_id and block number.
PartPathNames getParts() const;
Strings getParts() const;
size_t size() const;
@ -79,9 +70,9 @@ public:
private:
MergeTreeDataFormatVersion format_version;
std::map<MergeTreePartInfo, PartPathName> part_info_to_name;
std::map<MergeTreePartInfo, String> part_info_to_name;
std::map<MergeTreePartInfo, PartPathName>::const_iterator getContainingPartImpl(const MergeTreePartInfo & part_info) const;
std::map<MergeTreePartInfo, String>::const_iterator getContainingPartImpl(const MergeTreePartInfo & part_info) const;
};
}

View File

@ -201,7 +201,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
String relative_part_path = String(to_detached ? "detached/" : "") + tmp_prefix + part_name;
auto reservation = data.reserveSpaceForPart(0); ///@TODO_IGR ASK What size should be there?
String part_path = reservation->getPath();
String part_path = data.getFullPathOnDisk(reservation->getDisk2());
String absolute_part_path = part_path + relative_part_path + "/";
Poco::File part_file(absolute_part_path);
@ -212,7 +212,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
part_file.createDirectory();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, part_path, part_name);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, reservation->getDisk2(), part_name);
new_data_part->relative_path = relative_part_path;
new_data_part->is_temp = true;

View File

@ -3,13 +3,22 @@
#include <Common/escapeForFileName.h>
#include <Poco/File.h>
/// @TODO_IGR ASK Does such function already exists?
bool isAlphaNumeric(const std::string & s)
{
for (auto c : s)
if (!isalnum(c) && c != '_')
return false;
return true;
}
namespace DB
{
std::map<String, DiskSpaceMonitor::DiskReserve> DiskSpaceMonitor::reserved;
std::mutex DiskSpaceMonitor::mutex;
DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, String default_path)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
@ -17,31 +26,35 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
constexpr auto default_disk_name = "default";
for (const auto & disk_name : keys)
{
UInt64 keep_free_space_bytes = config.getUInt64(config_prefix + "." + disk_name + ".keep_free_space_bytes", 0);
if (!isAlphaNumeric(disk_name))
throw Exception("Disk name can contain only alphanumeric and '_' (" + disk_name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
auto disk_config_prefix = config_prefix + "." + disk_name;
UInt64 keep_free_space_bytes = config.getUInt64(disk_config_prefix + ".keep_free_space_bytes", 0);
String path;
if (config.has(config_prefix + "." + disk_name + ".path"))
path = config.getString(config_prefix + "." + disk_name + ".path");
if (config.has(disk_config_prefix + ".path"))
path = config.getString(disk_config_prefix + ".path");
if (disk_name == default_disk_name)
{
if (!path.empty())
///@TODO_IGR ASK Rename Default disk to smth? ClickHouse disk? DB disk?
throw Exception("It is not possible to specify default disk path", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
disks.emplace(disk_name, std::make_shared<const Disk>(disk_name, default_path, keep_free_space_bytes));
}
else
{
if (path.empty())
throw Exception("Disk path can not be empty. Disk " + disk_name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
disks.emplace(disk_name, std::make_shared<const Disk>(disk_name, path, keep_free_space_bytes));
}
disks.emplace(disk_name, Disk(disk_name, path, keep_free_space_bytes));
}
}
const Disk & DiskSelector::operator[](const String & name) const
const DiskPtr & DiskSelector::operator[](const String & name) const
{
auto it = disks.find(name);
if (it == disks.end())
throw Exception("Unknown disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
throw Exception("Unknown disk " + name, ErrorCodes::UNKNOWN_DISK);
return it->second;
}
@ -51,15 +64,9 @@ bool DiskSelector::has(const String & name) const
return it != disks.end();
}
void DiskSelector::add(const Disk & disk)
void DiskSelector::add(const DiskPtr & disk)
{
disks.emplace(disk.getName(), Disk(disk.getName(), disk.getPath(), disk.getKeepingFreeSpace()));
}
Schema::Volume::Volume(std::vector<Disk> disks_)
{
for (const auto & disk : disks_)
disks.push_back(std::make_shared<Disk>(disk));
disks.emplace(disk->getName(), disk);
}
Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disk_selector)
@ -67,62 +74,58 @@ Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const s
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
Strings disks_names;
for (const auto & name : keys)
{
if (startsWith(name.data(), "disk"))
if (startsWith(name, "disk"))
{
disks_names.push_back(config.getString(config_prefix + "." + name));
auto disk_name = config.getString(config_prefix + "." + name);
disks.push_back(disk_selector[disk_name]);
}
else if (name == "part_size_threshold_bytes")
{
max_data_part_size = config.getUInt64(config_prefix + "." + name);
}
///@TODO_IGR ASK part_size_threshold_ratio which set max_data_part_size by total disk sizes?
}
if (max_data_part_size == 0)
--max_data_part_size;
if (disks.empty()) {
throw Exception("Volume must contain at least one disk", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
/// Get paths from disk's names
/// Disks operator [] may throw exception
for (const auto & disk_name : disks_names)
disks.push_back(std::make_shared<Disk>(disk_selector[disk_name]));
}
Schema::Volume::Volume(const Volume & other, const String & default_path, const String & enclosed_dir)
: max_data_part_size(other.max_data_part_size),
disks(other.disks),
last_used(0)
{
auto dir = escapeForFileName(enclosed_dir);
for (auto & disk : disks)
auto has_max_bytes = config.has(config_prefix + ".max_data_part_size_bytes");
auto has_max_ratio = config.has(config_prefix + ".max_data_part_size_ratio");
if (has_max_bytes && has_max_ratio)
{
if (disk->getName() == "default")
{
disk->SetPath(default_path + dir + '/');
}
else
{
disk->addEnclosedDirToPath(dir);
throw Exception("Only one of 'max_data_part_size_bytes' and 'max_data_part_size_ratio' should be specified",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
if (has_max_bytes) {
max_data_part_size = config.getUInt64(config_prefix + ".max_data_part_size_bytes");
} else if (has_max_ratio) {
auto ratio = config.getDouble(config_prefix + ".max_data_part_size_bytes");
if (ratio < 0 and ratio > 1) {
throw Exception("'max_data_part_size_bytes' have to be between 0 and 1",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
UInt64 sum_size = 0;
for (const auto & disk : disks)
sum_size += disk->getTotalSpace();
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio);
} else {
max_data_part_size = std::numeric_limits<UInt64>::max();
}
}
DiskSpaceMonitor::ReservationPtr Schema::Volume::reserve(UInt64 expected_size) const
{
/// This volume can not store files which size greater than max_data_part_size
if (expected_size > max_data_part_size)
return {};
/// Real order is not necessary
size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
for (size_t i = 0; i != disks.size(); ++i)
{
size_t index = (start_from + i) % disks.size();
auto reservation = DiskSpaceMonitor::tryToReserve(disks[index], expected_size);
if (reservation)
if (reservation && *reservation)
return reservation;
}
return {};
@ -131,31 +134,11 @@ DiskSpaceMonitor::ReservationPtr Schema::Volume::reserve(UInt64 expected_size) c
UInt64 Schema::Volume::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
///@TODO_IGR ASK There is cycle with mutex locking inside(((
for (const auto & disk : disks)
res = std::max(res, DiskSpaceMonitor::getUnreservedFreeSpace(disk));
return res;
}
void Schema::Volume::data_path_rename(const String & new_default_path, const String & new_data_dir_name, const String & old_data_dir_name)
{
for (auto & disk : disks)
{
auto old_path = disk->getPath();
if (disk->getName() == "default")
{
disk->SetPath(new_default_path + new_data_dir_name + '/');
Poco::File(old_path).renameTo(new_default_path + new_data_dir_name + '/');
}
else
{
auto new_path = old_path.substr(0, old_path.size() - old_data_dir_name.size() - 1) + new_data_dir_name + '/';
disk->SetPath(new_path);
Poco::File(old_path).renameTo(new_path);
}
}
}
Schema::Schema(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disks)
{
Poco::Util::AbstractConfiguration::Keys keys;
@ -163,20 +146,22 @@ Schema::Schema(const Poco::Util::AbstractConfiguration & config, const std::stri
for (const auto & name : keys)
{
if (!startsWith(name.data(), "volume"))
throw Exception("Unknown element in config: " + config_prefix + "." + name + ", must be 'volume'",\
if (!startsWith(name, "volume"))
throw Exception("Unknown element in config: " + config_prefix + "." + name + ", must be 'volume'",
ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
volumes.emplace_back(config, config_prefix + "." + name, disks);
}
if (volumes.empty()) {
throw Exception("Schema must contain at least one Volume", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
}
///@TODO_IGR ASK maybe iteratable object without copy?
Strings Schema::getFullPaths() const
Schema::Disks Schema::getDisks() const
{
Strings res;
Disks res;
for (const auto & volume : volumes)
for (const auto & disk : volume.disks)
res.push_back(disk->getPath());
res.push_back(disk);
return res;
}
@ -199,47 +184,31 @@ DiskSpaceMonitor::ReservationPtr Schema::reserve(UInt64 expected_size) const
return {};
}
void Schema::data_path_rename(const String & new_default_path, const String & new_data_dir_name, const String & old_data_dir_name)
SchemaSelector::SchemaSelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks)
{
for (auto & volume : volumes)
volume.data_path_rename(new_default_path, new_data_dir_name, old_data_dir_name);
}
SchemaSelector::SchemaSelector(const Poco::Util::AbstractConfiguration & config, String config_prefix)
{
DiskSelector disks(config, config_prefix + ".disks");
constexpr auto default_disk_name = "default";
if (!disks.has(default_disk_name))
{
std::cerr << "No default disk settings" << std::endl;
disks.add(Disk(default_disk_name, "", 0));
}
config_prefix += ".schemes";
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
for (const auto & name : keys)
{
///@TODO_IGR ASK What if same names?
std::cerr << "Schema " + name << std::endl;
if (!isAlphaNumeric(name))
throw Exception("Schema name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
schemes.emplace(name, Schema{config, config_prefix + "." + name, disks});
LOG_INFO(&Logger::get("StatusFile"), "Storage schema " << name << "loaded"); ///@TODO_IGR ASK Logger?
}
constexpr auto default_schema_name = "default";
constexpr auto default_disk_name = "default";
if (schemes.find(default_schema_name) == schemes.end())
schemes.emplace(default_schema_name, Schema(Schema::Volumes{std::vector<Disk>{disks[default_disk_name]}}));
std::cerr << schemes.size() << " schemes loaded" << std::endl; ///@TODO_IGR ASK logs?
schemes.emplace(default_schema_name, Schema(Schema::Volumes{{std::vector<DiskPtr>{disks[default_disk_name]},
std::numeric_limits<UInt64>::max()}}));
}
const Schema & SchemaSelector::operator[](const String & name) const
{
auto it = schemes.find(name);
if (it == schemes.end())
throw Exception("Unknown schema " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); ///@TODO_IGR Choose error code
throw Exception("Unknown schema " + name, ErrorCodes::UNKNOWN_SCHEMA);
return it->second;
}

View File

@ -26,19 +26,20 @@ namespace ErrorCodes
extern const int NOT_ENOUGH_SPACE;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int UNKNOWN_SCHEMA;
extern const int UNKNOWN_DISK;
}
/** path - Contain path to data on disk.
* May be different from configuration's path.
* Ends with /
* name - Unique key using for disk space reservation.
*/
class Disk
{
public:
Disk(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
: name(name_),
path(path_),
Disk(String name_, String path_, UInt64 keep_free_space_bytes_)
: name(std::move(name_)),
path(std::move(path_)),
keep_free_space_bytes(keep_free_space_bytes_)
{
}
@ -58,14 +59,29 @@ public:
return keep_free_space_bytes;
}
void addEnclosedDirToPath(const String & dir)
UInt64 getTotalSpace() const
{
path += dir + '/';
struct statvfs fs;
if (statvfs(path.c_str(), &fs) != 0)
throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS);
UInt64 size = fs.f_blocks * fs.f_bsize;
return size;
}
void SetPath(const String & path_)
UInt64 getAvailableSpace() const
{
path = path_;
struct statvfs fs;
if (statvfs(path.c_str(), &fs) != 0)
throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS);
UInt64 size = fs.f_bfree * fs.f_bsize;
size -= std::min(size, keep_free_space_bytes);
return size;
}
private:
@ -74,7 +90,8 @@ private:
UInt64 keep_free_space_bytes;
};
using DiskPtr = std::shared_ptr<Disk>;
/// It is not possible to change disk runtime.
using DiskPtr = std::shared_ptr<const Disk>;
/** Determines amount of free space in filesystem.
@ -140,20 +157,37 @@ public:
return size;
}
const String & getPath() const
const DiskPtr & getDisk2() const ///@TODO_IGR rename
{
return disk_ptr->getPath();
return disk_ptr;
}
Reservation(UInt64 size_, DiskReserve * reserves_, DiskPtr disk_ptr_)
: size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size), reserves(reserves_),
disk_ptr(disk_ptr_)
Reservation(UInt64 size_, DiskPtr disk_ptr_)
: size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size), disk_ptr(std::move(disk_ptr_)) ///@TODO_IGR ASK DiskSpaceReservedForMerge?
{
auto unreserved = disk_ptr->getAvailableSpace();
LOG_INFO(&Logger::get("StatusFile"), "Reservation try: Unreserved " << unreserved << " ,size " << size);
std::lock_guard lock(DiskSpaceMonitor::mutex);
if (size > unreserved) {
/// Can not reserve, not enough space
///@TODO_IGR ASK metric_increment?
size = 0;
return;
}
reserves = &DiskSpaceMonitor::reserved[disk_ptr->getName()];
reserves->reserved_bytes += size;
++reserves->reservation_count;
}
/// Reservation valid when reserves not less then 1 byte
explicit operator bool() const noexcept {
return size != 0;
}
private:
UInt64 size;
CurrentMetrics::Increment metric_increment;
@ -165,17 +199,7 @@ public:
static UInt64 getUnreservedFreeSpace(const DiskPtr & disk_ptr)
{
struct statvfs fs;
if (statvfs(disk_ptr->getPath().c_str(), &fs) != 0)
throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS);
UInt64 res = fs.f_bfree * fs.f_bsize;
res -= std::min(res, disk_ptr->getKeepingFreeSpace()); ///@TODO_IGR ASK Is Heuristic by Michael Kolupaev actual?
/// Heuristic by Michael Kolupaev: reserve 30 MB more, because statvfs shows few megabytes more space than df.
res -= std::min(res, static_cast<UInt64>(30 * (1ul << 20)));
UInt64 res = disk_ptr->getAvailableSpace();
std::lock_guard lock(mutex);
@ -189,32 +213,30 @@ public:
return res;
}
static UInt64 getAllReservedSpace()
static std::vector<UInt64> getAllReservedSpace()
{
std::lock_guard lock(mutex);
UInt64 res = 0;
std::vector<UInt64> res;
for (const auto & reserve : reserved)
res += reserve.second.reserved_bytes;
res.push_back(reserve.second.reserved_bytes);
return res;
}
static UInt64 getAllReservationCount()
static std::vector<UInt64> getAllReservationCount()
{
std::lock_guard lock(mutex);
UInt64 res = 0;
std::vector<UInt64> res;
for (const auto & reserve : reserved)
res += reserve.second.reservation_count;
res.push_back(reserve.second.reservation_count);
return res;
}
/// If not enough (approximately) space, do not reserve.
/// If not, returns valid pointer
///@TODO_IGR ASK bla bla bla Reservation->operator bool()
static ReservationPtr tryToReserve(const DiskPtr & disk_ptr, UInt64 size)
{
UInt64 free_bytes = getUnreservedFreeSpace(disk_ptr);
///@TODO_IGR ASK twice reservation?
if (free_bytes < size)
return {};
return std::make_unique<Reservation>(size, &reserved[disk_ptr->getName()], disk_ptr);
return std::make_unique<Reservation>(size, disk_ptr);
}
private:
@ -226,55 +248,48 @@ private:
class DiskSelector
{
public:
DiskSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
DiskSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, String default_path);
const Disk & operator[](const String & name) const;
const DiskPtr & operator[](const String & name) const;
bool has(const String & name) const;
void add(const Disk & disk);
void add(const DiskPtr & disk);
private:
std::map<String, Disk> disks;
std::map<String, DiskPtr> disks;
};
class Schema
{
public:
using Disks = std::vector<DiskPtr>;
class Volume
{
friend class Schema;
public:
/// Volume owns DiskPtrs
/// This means that there is no Volumes that share one DiskPtr
using Disks = std::vector<DiskPtr>;
static Disks CopyDisks(const Disks & disks)
{
Disks copy;
for (auto & disk_ptr : disks)
copy.push_back(std::make_shared<Disk>(*disk_ptr));
return copy;
}
Volume(std::vector<Disk> disks_);
Volume(std::vector<DiskPtr> disks_, UInt64 max_data_part_size_)
: max_data_part_size(max_data_part_size_), disks(std::move(disks_)) { }
Volume(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disk_selector);
Volume(const Volume & other) : max_data_part_size(other.max_data_part_size), disks(CopyDisks(other.disks)) { ; }
Volume(const Volume & other) : max_data_part_size(other.max_data_part_size), disks(other.disks) { }
Volume & operator=(const Volume & other)
{
disks = CopyDisks(other.disks);
disks = other.disks;
max_data_part_size = other.max_data_part_size;
last_used.store(0, std::memory_order_relaxed);
return *this;
}
Volume(Volume && other) : max_data_part_size(other.max_data_part_size), disks(std::move(other.disks)) { ; }
Volume & operator=(Volume && other)
Volume(Volume && other) noexcept
: max_data_part_size(other.max_data_part_size), disks(std::move(other.disks)) { }
Volume & operator=(Volume && other) noexcept
{
disks = std::move(other.disks);
max_data_part_size = other.max_data_part_size;
@ -282,20 +297,15 @@ public:
return *this;
}
Volume(const Volume & other, const String & default_path, const String & enclosed_dir);
DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const;
UInt64 getMaxUnreservedFreeSpace() const;
void data_path_rename(const String & new_default_path, const String & new_data_dir_name, const String & old_data_dir_name);
private:
UInt64 max_data_part_size;
UInt64 max_data_part_size = std::numeric_limits<decltype(max_data_part_size)>::max();
Disks disks;
mutable std::atomic<size_t> last_used = 0; ///@TODO_IGR ASK It is thread safe, but it is not consistent. :(
/// P.S. I do not want to use mutex here
mutable std::atomic<size_t> last_used = 0;
};
using Volumes = std::vector<Volume>;
@ -307,20 +317,12 @@ public:
Schema(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disks);
Schema(const Schema & other, const String & default_path, const String & enclosed_dir)
{
for (const auto & volume : other.volumes)
volumes.push_back(Volume(volume, default_path, enclosed_dir));
}
Strings getFullPaths() const;
Disks getDisks() const;
UInt64 getMaxUnreservedFreeSpace() const;
DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const;
void data_path_rename(const String & new_default_path, const String & new_data_dir_name, const String & old_data_dir_name);
private:
Volumes volumes;
};
@ -328,7 +330,7 @@ private:
class SchemaSelector
{
public:
SchemaSelector(const Poco::Util::AbstractConfiguration & config, String config_prefix);
SchemaSelector(const Poco::Util::AbstractConfiguration & config, const String& config_prefix, const DiskSelector & disks);
const Schema & operator[](const String & name) const;

View File

@ -89,7 +89,6 @@ namespace ErrorCodes
MergeTreeData::MergeTreeData(
const String & database_, const String & table_,
const String & path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
Context & context_,
@ -111,8 +110,7 @@ MergeTreeData::MergeTreeData(
sample_by_ast(sample_by_ast_),
require_part_metadata(require_part_metadata_),
database_name(database_), table_name(table_),
full_path(path_ + escapeForFileName(table_name) + '/'),
schema(context_.getSchema(settings.storage_schema_name), path_, escapeForFileName(table_name)), ///@TODO_IGR Schema name
schema(context_.getSchema(settings.storage_schema_name)),
broken_part_callback(broken_part_callback_),
log_name(database_name + "." + table_name), log(&Logger::get(log_name + " (Data)")),
data_parts_by_info(data_parts_indexes.get<TagByInfo>()),
@ -161,28 +159,41 @@ MergeTreeData::MergeTreeData(
min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING;
}
auto path_exists = Poco::File(full_path).exists();
// format_file always contained on default disk
String version_file_path;
/// Creating directories, if not exist.
Poco::File(full_path).createDirectories();
for (const String & path : getFullPaths())
{
std::cerr << "Create path " << path << " by " << table_name << std::endl;
Poco::File(path).createDirectories();
Poco::File(path + "detached").createDirectory();
if (Poco::File{path + "format_version.txt"}.exists()) {
if (!version_file_path.empty())
{
LOG_ERROR(log, "Duplication of version file " << version_file_path << " and " << path << "format_file.txt");
throw Exception("Multiple format_version.txt file", ErrorCodes::CORRUPTED_DATA);
}
version_file_path = path + "format_version.txt";
}
}
// format_file always contained in default path
String version_file_path = full_path + "format_version.txt"; ///@TODO_IGR ASK What path should we use for format file?
/// If not choose any
if (version_file_path.empty()) {
version_file_path = schema.getDisks()[0]->getPath() + "format_version.txt";
}
///@TODO_IGR ASK LOGIC
auto version_file_exists = Poco::File(version_file_path).exists();
// When data path or file not exists, ignore the format_version check
if (!attach || !path_exists || !version_file_exists)
if (!attach || !version_file_exists)
{
format_version = min_format_version;
WriteBufferFromFile buf(version_file_path);
writeIntText(format_version.toUnderType(), buf);
}
else if (Poco::File(version_file_path).exists())
else if (version_file_exists)
{
ReadBufferFromFile buf(version_file_path);
UInt32 read_format_version;
@ -635,17 +646,17 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
const auto full_paths = getFullPaths();
std::vector<std::pair<String, size_t>> part_file_names;
std::vector<std::pair<String, DiskPtr>> part_names_with_disks;
Poco::DirectoryIterator end;
for (size_t i = 0; i != full_paths.size(); ++i)
for (auto disk_ptr : schema.getDisks())
{
for (Poco::DirectoryIterator it(full_paths[i]); it != end; ++it)
for (Poco::DirectoryIterator it(getFullPathOnDisk(disk_ptr)); it != end; ++it)
{
/// Skip temporary directories.
if (startsWith(it.name(), "tmp"))
continue;
part_file_names.emplace_back(it.name(), i);
part_names_with_disks.emplace_back(it.name(), disk_ptr);
}
}
@ -656,16 +667,15 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto lock = lockParts();
data_parts_indexes.clear();
for (const auto & part_file_name : part_file_names)
for (const auto & [part_name, part_disk_ptr] : part_names_with_disks)
{
const String & file_name = part_file_name.first;
const size_t path_index = part_file_name.second;
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(file_name, &part_info, format_version))
if (!MergeTreePartInfo::tryParsePartName(part_name, &part_info, format_version))
continue;
MutableDataPartPtr part = std::make_shared<DataPart>(*this, full_paths[path_index], file_name, part_info);
part->relative_path = file_name;
MutableDataPartPtr part = std::make_shared<DataPart>(*this, part_disk_ptr, part_name, part_info);
part->relative_path = part_name;
bool broken = false;
try
@ -698,7 +708,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (part->info.level == 0)
{
/// It is impossible to restore level 0 parts.
LOG_ERROR(log, "Considering to remove broken part " << full_paths[path_index] + file_name << " because it's impossible to repair.");
LOG_ERROR(log, "Considering to remove broken part " << getFullPathOnDisk(part_disk_ptr) + part_name << " because it's impossible to repair.");
broken_parts_to_remove.push_back(part);
}
else
@ -708,11 +718,11 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// delete it.
size_t contained_parts = 0;
LOG_ERROR(log, "Part " << full_paths[path_index] + file_name << " is broken. Looking for parts to replace it.");
LOG_ERROR(log, "Part " << getFullPathOnDisk(part_disk_ptr) + part_name << " is broken. Looking for parts to replace it.");
for (const auto & [contained_name, full_path_index] : part_file_names)
for (const auto & [contained_name, contained_disk_ptr] : part_names_with_disks)
{
if (contained_name == file_name)
if (contained_name == part_name)
continue;
MergeTreePartInfo contained_part_info;
@ -721,19 +731,19 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
if (part->info.contains(contained_part_info))
{
LOG_ERROR(log, "Found part " << full_paths[full_path_index] + contained_name);
LOG_ERROR(log, "Found part " << getFullPathOnDisk(contained_disk_ptr) + contained_name);
++contained_parts;
}
}
if (contained_parts >= 2)
{
LOG_ERROR(log, "Considering to remove broken part " << full_paths[path_index] + file_name << " because it covers at least 2 other parts");
LOG_ERROR(log, "Considering to remove broken part " << getFullPathOnDisk(part_disk_ptr) + part_name << " because it covers at least 2 other parts");
broken_parts_to_remove.push_back(part);
}
else
{
LOG_ERROR(log, "Detaching broken part " << full_paths[path_index] + file_name
LOG_ERROR(log, "Detaching broken part " << getFullPathOnDisk(part_disk_ptr) + part_name
<< " because it covers less than 2 parts. You need to resolve this manually");
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;
@ -743,7 +753,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
continue;
}
part->modification_time = Poco::File(full_paths[path_index] + file_name).getLastModified().epochTime();
part->modification_time = Poco::File(getFullPathOnDisk(part_disk_ptr) + part_name).getLastModified().epochTime();
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->state = DataPartState::Committed;
@ -860,7 +870,7 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life
if (tmp_dir.isDirectory() && isOldPartDirectory(tmp_dir, deadline))
{
LOG_WARNING(log, "Removing temporary directory " << full_data_path << it.name());
Poco::File(full_path + it.name()).remove(true);
Poco::File(full_data_path + it.name()).remove(true);
}
}
catch (const Poco::FileNotFoundException &)
@ -985,39 +995,30 @@ void MergeTreeData::clearOldPartsFromFilesystem()
removePartsFinally(parts_to_remove);
}
void MergeTreeData::rename(const String & new_path, const String & new_table_name)
void MergeTreeData::rename(const String & new_database_name, const String & new_table_name)
{
/// It is possible to change default disk path
/// It is impossible to change another disk path here, but possible to change table_name there
auto old_file_db_name = escapeForFileName(database_name);
auto new_file_db_name = escapeForFileName(new_database_name);
auto old_file_table_name = escapeForFileName(table_name);
auto new_file_table_name = escapeForFileName(new_table_name);
auto full_paths = getFullPaths();
for (const auto & full_data_path : full_paths)
for (const auto & disk : schema.getDisks())
{
auto new_full_path = full_data_path.substr(0, full_data_path.size() - old_file_table_name.size() - 1) + new_file_table_name + '/';
auto new_full_path = disk->getPath() + new_file_db_name + '/' + new_file_table_name + '/';
if (Poco::File{new_full_path}.exists())
throw Exception{"Target path already exists: " + new_full_path, ErrorCodes::DIRECTORY_ALREADY_EXISTS};
}
auto new_full_path = new_path + new_file_table_name + '/';
if (Poco::File{new_full_path}.exists())
throw Exception{"Target path already exists: " + new_full_path, ErrorCodes::DIRECTORY_ALREADY_EXISTS};
/// Everything is fine. Rename
schema.data_path_rename(new_path, new_file_table_name, old_file_table_name);
/// If default path doesn't store data
if (!Poco::File(new_full_path).exists())
for (const auto & disk : schema.getDisks())
{
std::cerr << "default path doesn't store data" << std::endl;
Poco::File(full_path).renameTo(new_full_path);
auto full_path = disk->getPath() + old_file_db_name + '/' + old_file_table_name + '/';
auto new_full_path = disk->getPath() + new_file_db_name + '/' + new_file_table_name + '/';
Poco::File{full_path}.renameTo(new_full_path);
}
global_context.dropCaches();
full_path = new_full_path;
///@TODO_IGR ASK We have not did it yet
database_name = new_database_name;
table_name = new_table_name;
}
@ -1036,8 +1037,11 @@ void MergeTreeData::dropAllData()
LOG_TRACE(log, "dropAllData: removing data from filesystem.");
for (auto && full_data_path : getFullPaths())
for (auto && full_data_path : getFullPaths()) {
Poco::File(full_data_path).remove(true);
std::cerr << full_data_path << " removed by " << table_name << std::endl;
}
LOG_TRACE(log, "dropAllData: done.");
}
@ -1412,7 +1416,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
exception_message
<< ") need to be "
<< (forbidden_because_of_modify ? "modified" : "removed")
<< " in part " << part->name << " of table at " << part->full_path << ". Aborting just in case."
<< " in part " << part->name << " of table at " << part->getFullPath() << ". Aborting just in case."
<< " If it is not an error, you could increase merge_tree/"
<< (forbidden_because_of_modify ? "max_files_to_modify_in_alter_columns" : "max_files_to_remove_in_alter_columns")
<< " parameter in configuration file (current value: "
@ -2181,9 +2185,9 @@ MergeTreeData::DataPartPtr MergeTreeData::getPartIfExists(const String & part_na
}
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const String & path, const String & relative_path)
MergeTreeData::MutableDataPartPtr MergeTreeData::loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path)
{
MutableDataPartPtr part = std::make_shared<DataPart>(*this, path, Poco::Path(relative_path).getFileName());
MutableDataPartPtr part = std::make_shared<DataPart>(*this, disk, Poco::Path(relative_path).getFileName());
part->relative_path = relative_path;
String full_part_path = part->getFullPath();
@ -2416,7 +2420,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getAllDataPartsVector(MergeTreeDat
return res;
}
DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceForPart(UInt64 expected_size) const
DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceForPart(UInt64 expected_size)
{
// std::cerr << "Exp size " << expected_size << std::endl;
constexpr UInt64 SIZE_1MB = 1ull << 20; ///@TODO_IGR ASK Is it OK?
@ -2620,7 +2624,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto reservation = reserveSpaceForPart(src_part->bytes_on_disk);
String dst_part_path = reservation->getPath();
String dst_part_path = getFullPathOnDisk(reservation->getDisk2());
Poco::Path dst_part_absolute_path = Poco::Path(dst_part_path + tmp_dst_part_name).absolute();
Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute();
@ -2630,7 +2634,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
LOG_DEBUG(log, "Cloning part " << src_part_absolute_path.toString() << " to " << dst_part_absolute_path.toString());
localBackup(src_part_absolute_path, dst_part_absolute_path);
MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared<MergeTreeData::DataPart>(*this, dst_part_path, dst_part_name, dst_part_info);
MergeTreeData::MutableDataPartPtr dst_data_part = std::make_shared<MergeTreeData::DataPart>(*this, reservation->getDisk2(), dst_part_name, dst_part_info);
dst_data_part->relative_path = tmp_dst_part_name;
dst_data_part->is_temp = true;
@ -2639,11 +2643,23 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
return dst_data_part;
}
DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceAtDisk(UInt64 expected_size) const
DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceAtDisk(UInt64 expected_size)
{
return schema.reserve(expected_size);
}
String MergeTreeData::getFullPathOnDisk(const DiskPtr & disk) const {
return disk->getPath() + escapeForFileName(database_name) + '/' + escapeForFileName(table_name) + '/';
}
Strings MergeTreeData::getFullPaths() const {
Strings res;
for (const auto & disk : schema.getDisks()) {
res.push_back(getFullPathOnDisk(disk));
}
return res;
}
void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & with_name, const Context & context)
{
String clickhouse_path = Poco::Path(context.getPath()).makeAbsolute().toString();

View File

@ -304,7 +304,6 @@ public:
/// require_part_metadata - should checksums.txt and columns.txt exist in the part directory.
/// attach - whether the existing table is attached or the new table is created.
MergeTreeData(const String & database_, const String & table_,
const String & path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
Context & context_,
@ -364,8 +363,6 @@ public:
String getTableName() const { return table_name; }
DiskSpaceMonitor::ReservationPtr reserveSpaceForPart(UInt64 expected_size) const; ///@TODO_IGR ASK Is it realy const?
String getLogName() const { return log_name; }
/// Returns a copy of the list so that the caller shouldn't worry about locks.
@ -476,7 +473,7 @@ public:
/// Moves the entire data directory.
/// Flushes the uncompressed blocks cache and the marks cache.
/// Must be called with locked lockStructureForAlter().
void rename(const String & new_path, const String & new_table_name);
void rename(const String & new_database_name, const String & new_table_name);
/// Check if the ALTER can be performed:
/// - all needed columns are present.
@ -527,7 +524,7 @@ public:
Names getColumnsRequiredForSampling() const { return columns_required_for_sampling; }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const String & path, const String & relative_path);
MutableDataPartPtr loadPartAndFixMetadata(const DiskPtr & disk, const String & relative_path);
/** Create local backup (snapshot) for parts with specified prefix.
* Backup is created in directory clickhouse_dir/shadow/i/, where i - incremental number,
@ -567,9 +564,13 @@ public:
MergeTreeData::MutableDataPartPtr cloneAndLoadDataPart(const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix,
const MergeTreePartInfo & dst_part_info);
DiskSpaceMonitor::ReservationPtr reserveSpaceAtDisk(UInt64 expected_size) const; ///@TODO_IGR ASK Maybe set this method as private?
String getFullPathOnDisk(const DiskPtr & disk) const;
Strings getFullPaths() const { return schema.getFullPaths(); }
Strings getFullPaths() const;
DiskSpaceMonitor::ReservationPtr reserveSpaceAtDisk(UInt64 expected_size);
DiskSpaceMonitor::ReservationPtr reserveSpaceForPart(UInt64 expected_size);
MergeTreeDataFormatVersion format_version;
@ -636,10 +637,6 @@ private:
String database_name;
String table_name;
/// Defalt storage path. Always contain format_version.txt
/// Can contain data if specified in schema
String full_path;
Schema schema;
/// Current column sizes in compressed and uncompressed form.

View File

@ -289,9 +289,7 @@ bool MergeTreeDataMergerMutator::selectAllPartsToMergeWithinPartition(
disk_space_warning_time = now;
LOG_WARNING(log, "Won't merge parts from " << parts.front()->name << " to " << (*prev_it)->name
<< " because not enough free space: "
<< formatReadableSizeWithBinarySuffix(available_disk_space) << " free and unreserved "
<< "(" << formatReadableSizeWithBinarySuffix(DiskSpaceMonitor::getAllReservedSpace()) << " reserved in " ///@TODO_IGR ASK RESERVED SPACE ON ALL DISKS?
<< DiskSpaceMonitor::getAllReservationCount() << " chunks at all disks), "
<< formatReadableSizeWithBinarySuffix(available_disk_space) << " free and unreserved, "
<< formatReadableSizeWithBinarySuffix(sum_bytes)
<< " required now (+" << static_cast<int>((DISK_USAGE_COEFFICIENT_TO_SELECT - 1.0) * 100)
<< "% on overhead); suppressing similar warnings for the next hour");
@ -513,8 +511,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
<< parts.front()->name << " to " << parts.back()->name
<< " into " << TMP_PREFIX + future_part.name);
String part_path = disk_reservation->getPath();
String part_path = data.getFullPathOnDisk(disk_reservation->getDisk2());
String new_part_tmp_path = part_path + TMP_PREFIX + future_part.name + "/";
if (Poco::File(new_part_tmp_path).exists())
throw Exception("Directory " + new_part_tmp_path + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
@ -533,7 +530,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, part_path, future_part.name, future_part.part_info);
data, disk_reservation->getDisk2(), future_part.name, future_part.part_info);
new_data_part->partition.assign(future_part.getPartition());
new_data_part->relative_path = TMP_PREFIX + future_part.name;
new_data_part->is_temp = true;
@ -856,10 +853,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
else
LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation);
String part_path = disk_reservation->getPath();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, part_path, future_part.name, future_part.part_info);
data, disk_reservation->getDisk2(), future_part.name, future_part.part_info);
new_data_part->relative_path = "tmp_mut_" + future_part.name;
new_data_part->is_temp = true;

View File

@ -136,9 +136,9 @@ void MergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
}
MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const String& path_, const String & name_)
MergeTreeDataPart::MergeTreeDataPart(MergeTreeData & storage_, const DiskPtr & disk_, const String & name_)
///@TODO_IGR DO check is fromPartName need to use path
: storage(storage_), full_path(path_), name(name_), info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
: storage(storage_), disk(disk_), name(name_), info(MergeTreePartInfo::fromPartName(name_, storage.format_version))
{
}
@ -234,7 +234,7 @@ String MergeTreeDataPart::getFullPath() const
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. This is bug.", ErrorCodes::LOGICAL_ERROR);
return full_path + relative_path + "/";
return storage.getFullPathOnDisk(disk) + relative_path + "/";
}
String MergeTreeDataPart::getNameWithPrefix() const
@ -356,6 +356,7 @@ void MergeTreeDataPart::remove() const
* And a race condition can happen that will lead to "File not found" error here.
*/
String full_path = storage.getFullPathOnDisk(disk);
String from = full_path + relative_path;
String to = full_path + "delete_tmp_" + name;
@ -397,7 +398,7 @@ void MergeTreeDataPart::remove() const
void MergeTreeDataPart::renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) const
{
String from = getFullPath();
String to = full_path + new_relative_path + "/";
String to = storage.getFullPathOnDisk(disk) + new_relative_path + "/";
Poco::File from_file(from);
if (!from_file.exists())
@ -443,7 +444,7 @@ String MergeTreeDataPart::getRelativePathForDetachedPart(const String & prefix)
{
res = dst_name();
if (!Poco::File(full_path + res).exists())
if (!Poco::File(storage.getFullPathOnDisk(disk) + res).exists())
return res;
LOG_WARNING(storage.log, "Directory " << dst_name() << " (to detach to) is already exist."
@ -463,7 +464,7 @@ void MergeTreeDataPart::renameToDetached(const String & prefix) const
void MergeTreeDataPart::makeCloneInDetached(const String & prefix) const
{
Poco::Path src(getFullPath());
Poco::Path dst(full_path + getRelativePathForDetachedPart(prefix));
Poco::Path dst(storage.getFullPathOnDisk(disk) + getRelativePathForDetachedPart(prefix));
///@TODO_IGR ASK What about another path?
/// Backup is not recursive (max_level is 0), so do not copy inner directories
localBackup(src, dst, 0);

View File

@ -28,12 +28,12 @@ struct MergeTreeDataPart
using Checksums = MergeTreeDataPartChecksums;
using Checksum = MergeTreeDataPartChecksums::Checksum;
MergeTreeDataPart(const MergeTreeData & storage_, const String & path_, const String & name_, const MergeTreePartInfo & info_)
: storage(storage_), full_path(path_), name(name_), info(info_)
MergeTreeDataPart(const MergeTreeData & storage_, const DiskPtr & disk_, const String & name_, const MergeTreePartInfo & info_)
: storage(storage_), disk(disk_), name(name_), info(info_)
{
}
MergeTreeDataPart(MergeTreeData & storage_, const String & path_, const String & name_);
MergeTreeDataPart(MergeTreeData & storage_, const DiskPtr & disk_, const String & name_);
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
@ -86,7 +86,7 @@ struct MergeTreeDataPart
const MergeTreeData & storage;
String full_path;
DiskPtr disk;
String name;
MergeTreePartInfo info;

View File

@ -169,9 +169,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
size_t expected_size = block.bytes();
auto reservation = data.reserveSpaceForPart(expected_size); ///@TODO_IGR ASK expected size
String part_absolute_path = reservation->getPath();
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, part_absolute_path, part_name, new_part_info);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data, reservation->getDisk2(), part_name, new_part_info);
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
new_data_part->relative_path = TMP_PREFIX + part_name;

View File

@ -32,8 +32,8 @@ void ReplicatedMergeTreeQueue::addVirtualParts(const MergeTreeData::DataParts &
for (const auto & part : parts)
{
current_parts.add("/", part->name);
virtual_parts.add("/", part->name);
current_parts.add(part->name);
virtual_parts.add(part->name);
}
}
@ -122,7 +122,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
{
for (const String & virtual_part_name : entry->getVirtualPartNames())
{
virtual_parts.add("/", virtual_part_name);
virtual_parts.add(virtual_part_name);
updateMutationsPartsToDo(virtual_part_name, /* add = */ true);
}
@ -192,13 +192,13 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
{
for (const String & virtual_part_name : entry->getVirtualPartNames())
{
ActiveDataPartSet::PartPathNames replaced_parts;
current_parts.add(String("/"), virtual_part_name, &replaced_parts);
Strings replaced_parts;
current_parts.add(virtual_part_name, &replaced_parts);
/// Each part from `replaced_parts` should become Obsolete as a result of executing the entry.
/// So it is one less part to mutate for each mutation with block number greater than part_info.getDataVersion()
for (const auto & replaced_part : replaced_parts)
updateMutationsPartsToDo(replaced_part.name, /* add = */ false);
for (const String & replaced_part_name : replaced_parts)
updateMutationsPartsToDo(replaced_part_name, /* add = */ false);
}
String drop_range_part_name;
@ -539,9 +539,9 @@ static size_t countPartsToMutate(
/// because they are not consecutive in `parts`.
MergeTreePartInfo covering_part_info(
partition_id, 0, block_num, MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER);
for (const auto & covered_part : parts.getPartsCoveredBy(covering_part_info))
for (const String & covered_part_name : parts.getPartsCoveredBy(covering_part_info))
{
auto part_info = MergeTreePartInfo::fromPartName(covered_part.name, parts.getFormatVersion());
auto part_info = MergeTreePartInfo::fromPartName(covered_part_name, parts.getFormatVersion());
if (part_info.getDataVersion() < block_num)
++count;
}
@ -1306,7 +1306,7 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
void ReplicatedMergeTreeQueue::disableMergesInRange(const String & part_name)
{
std::lock_guard lock(state_mutex);
virtual_parts.add("/", part_name);
virtual_parts.add(part_name);
}
@ -1572,7 +1572,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
return false;
}
if (prev_virtual_parts.getContainingPart(part->info).name.empty())
if (prev_virtual_parts.getContainingPart(part->info).empty())
{
if (out_reason)
*out_reason = "Entry for part " + part->name + " hasn't been read from the replication log yet";
@ -1610,7 +1610,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
{
/// We look for containing parts in queue.virtual_parts (and not in prev_virtual_parts) because queue.virtual_parts is newer
/// and it is guaranteed that it will contain all merges assigned before this object is constructed.
String containing_part = queue.virtual_parts.getContainingPart(part->info).name;
String containing_part = queue.virtual_parts.getContainingPart(part->info);
if (containing_part != part->name)
{
if (out_reason)
@ -1625,11 +1625,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
left->info.partition_id, left_max_block + 1, right_min_block - 1,
MergeTreePartInfo::MAX_LEVEL, MergeTreePartInfo::MAX_BLOCK_NUMBER);
auto tmp = queue.virtual_parts.getPartsCoveredBy(gap_part_info);
Strings covered;
for (auto & elem : tmp)
covered.push_back(elem.name);
Strings covered = queue.virtual_parts.getPartsCoveredBy(gap_part_info);
if (!covered.empty())
{
if (out_reason)
@ -1671,7 +1667,7 @@ std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersio
std::lock_guard lock(queue.state_mutex);
if (queue.virtual_parts.getContainingPart(part->info).name != part->name)
if (queue.virtual_parts.getContainingPart(part->info) != part->name)
return {};
auto in_partition = queue.mutations_by_partition.find(part->info.partition_id);

View File

@ -633,13 +633,13 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (replicated)
return StorageReplicatedMergeTree::create(
zookeeper_path, replica_name, args.attach, args.data_path, args.database_name, args.table_name,
zookeeper_path, replica_name, args.attach, args.database_name, args.table_name,
args.columns, indices_description,
args.context, date_column_name, partition_by_ast, order_by_ast, primary_key_ast,
sample_by_ast, merging_params, storage_settings, args.has_force_restore_data_flag);
else
return StorageMergeTree::create(
args.data_path, args.database_name, args.table_name, args.columns, indices_description,
args.database_name, args.table_name, args.columns, indices_description,
args.attach, args.context, date_column_name, partition_by_ast, order_by_ast,
primary_key_ast, sample_by_ast, merging_params, storage_settings, args.has_force_restore_data_flag);
}

View File

@ -47,7 +47,6 @@ namespace ActionLocks
StorageMergeTree::StorageMergeTree(
const String & path_,
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_,
@ -62,17 +61,14 @@ StorageMergeTree::StorageMergeTree(
const MergeTreeData::MergingParams & merging_params_,
const MergeTreeSettings & settings_,
bool has_force_restore_data_flag)
: path(path_), database_name(database_name_), table_name(table_name_),
: database_name(database_name_), table_name(table_name_),
global_context(context_), background_pool(context_.getBackgroundPool()),
data(database_name, table_name, path_, columns_, indices_,
data(database_name, table_name, columns_, indices_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, merging_params_, settings_, false, attach),
reader(data), writer(data), merger_mutator(data, global_context.getBackgroundPool()),
log(&Logger::get(database_name_ + "." + table_name + " (StorageMergeTree)"))
{
if (path_.empty())
throw Exception("MergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
data.loadDataParts(has_force_restore_data_flag);
if (!attach && !data.getDataParts().empty())
@ -176,11 +172,11 @@ void StorageMergeTree::truncate(const ASTPtr &, const Context &)
data.clearOldPartsFromFilesystem();
}
void StorageMergeTree::rename(const String & new_path_to_db, const String & /*new_database_name*/, const String & new_table_name)
void StorageMergeTree::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name)
{
data.rename(new_path_to_db, new_table_name);
data.rename(new_database_name, new_table_name);
path = new_path_to_db + escapeForFileName(new_table_name) + '/';
database_name = new_database_name;
table_name = new_table_name;
/// NOTE: Logger names are not updated.
@ -332,7 +328,7 @@ public:
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
{
auto reservation = data.reserveSpaceForPart(0); ///@TODO_IGR ASK What expected size of mutated part? what size should we reserve?
MergeTreeMutationEntry entry(commands, reservation->getPath(), data.insert_increment.get());
MergeTreeMutationEntry entry(commands, data.getFullPathOnDisk(reservation->getDisk2()), data.insert_increment.get());
String file_name;
{
std::lock_guard lock(currently_merging_mutex);
@ -956,22 +952,22 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
String source_dir = "detached/";
const auto full_paths = data.getFullPaths();
std::map<String, DiskPtr> name_to_disk;
/// Let's make a list of parts to add.
ActiveDataPartSet::PartPathNames parts;
Strings parts;
if (attach_part)
{
for (const String & full_path : full_paths)
parts.push_back(ActiveDataPartSet::PartPathName{full_path, partition_id});
parts.push_back(partition_id);
}
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
///@TODO_IGR ASK ActiveDataPartSet without path? Is it possible here?
ActiveDataPartSet active_parts(data.format_version);
for (const String & full_path : full_paths)
const auto disks = data.schema.getDisks();
for (const DiskPtr & disk : disks)
{
const auto full_path = data.getFullPathOnDisk(disk);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
const String & name = it.name();
@ -982,21 +978,22 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
continue;
}
LOG_DEBUG(log, "Found part " << name);
active_parts.add(full_path, name); ///@TODO_IGR ASK full_path? full_path + detached?
active_parts.add(name);
name_to_disk[name] = disk;
}
}
LOG_DEBUG(log, active_parts.size() << " of them are active");
parts = active_parts.getParts();
}
for (const auto & source_part : parts)
for (const auto & source_part_name : parts)
{
String source_path = source_dir + source_part.name;
const auto & source_part_disk = name_to_disk[source_part_name];
LOG_DEBUG(log, "Checking data");
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_part.path, source_part.name);
MergeTreeData::MutableDataPartPtr part = data.loadPartAndFixMetadata(source_part_disk, source_part_name);
LOG_INFO(log, "Attaching part " << source_part.name << " from " << source_path);
LOG_INFO(log, "Attaching part " << source_part_name << " from " << data.getFullPathOnDisk(source_part_disk));
data.renameTempPartAndAdd(part, &increment);
LOG_INFO(log, "Finished attaching part");

View File

@ -101,7 +101,6 @@ public:
Names getColumnsRequiredForFinal() const override { return data.getColumnsRequiredForSortingKey(); }
private:
String path;
String database_name;
String table_name;
@ -168,7 +167,6 @@ protected:
* See MergeTreeData constructor for comments on parameters.
*/
StorageMergeTree(
const String & path_,
const String & database_name_,
const String & table_name_,
const ColumnsDescription & columns_,

View File

@ -197,7 +197,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & zookeeper_path_,
const String & replica_name_,
bool attach,
const String & path_, const String & database_name_, const String & name_,
const String & database_name_, const String & name_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
Context & context_,
@ -211,10 +211,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
bool has_force_restore_data_flag)
: global_context(context_),
database_name(database_name_),
table_name(name_), full_path(path_ + escapeForFileName(table_name) + '/'),
table_name(name_),
zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, database_name, table_name)),
replica_name(global_context.getMacros()->expand(replica_name_, database_name, table_name)),
data(database_name, table_name, path_, columns_, indices_,
data(database_name, table_name, columns_, indices_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, merging_params_, settings_, true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); }),
@ -223,9 +223,6 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
cleanup_thread(*this), alter_thread(*this), part_check_thread(*this), restarting_thread(*this),
log(&Logger::get(database_name + "." + table_name + " (StorageReplicatedMergeTree)"))
{
if (path_.empty())
throw Exception("ReplicatedMergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')
zookeeper_path.resize(zookeeper_path.size() - 1);
/// If zookeeper chroot prefix is used, path should start with '/', because chroot concatenates without it.
@ -1697,7 +1694,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
if (part_desc->src_table_part)
{
/// It is clonable part
adding_parts_active_set.add(full_path, part_desc->new_part_name);
adding_parts_active_set.add(part_desc->new_part_name);
part_name_to_desc.emplace(part_desc->new_part_name, part_desc);
continue;
}
@ -1730,14 +1727,14 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
part_desc->found_new_part_info = MergeTreePartInfo::fromPartName(found_part_name, data.format_version);
part_desc->replica = replica;
adding_parts_active_set.add(full_path, part_desc->found_new_part_name);
adding_parts_active_set.add(part_desc->found_new_part_name);
part_name_to_desc.emplace(part_desc->found_new_part_name, part_desc);
}
/// Check that we could cover whole range
for (PartDescriptionPtr & part_desc : parts_to_add)
{
if (adding_parts_active_set.getContainingPart(part_desc->new_part_info).name.empty())
if (adding_parts_active_set.getContainingPart(part_desc->new_part_info).empty())
{
throw Exception("Not found part " + part_desc->new_part_name +
" (or part covering it) neither source table neither remote replicas" , ErrorCodes::NO_REPLICA_HAS_PART);
@ -1749,9 +1746,8 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
{
auto final_part_names = adding_parts_active_set.getParts();
for (const auto & final_part : final_part_names)
for (const auto & final_part_name : final_part_names)
{
const auto & final_part_name = final_part.name;
auto part_desc = part_name_to_desc[final_part_name];
if (!part_desc)
throw Exception("There is no final part " + final_part_name + ". This is a bug", ErrorCodes::LOGICAL_ERROR);
@ -1930,16 +1926,15 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
/// Add to the queue jobs to receive all the active parts that the reference/master replica has.
Strings parts_tmp = zookeeper->getChildren(source_path + "/parts");
ActiveDataPartSet::PartPathNames parts;
Strings parts;
for (const auto & elem : parts_tmp)
parts.push_back(ActiveDataPartSet::PartPathName{"/", elem});
parts.push_back(elem);
ActiveDataPartSet active_parts_set(data.format_version, parts);
auto active_parts = active_parts_set.getParts();
for (const auto & path_name : active_parts)
for (const auto & name : active_parts)
{
const auto & name = path_name.name;
LogEntry log_entry;
log_entry.type = LogEntry::GET_PART;
log_entry.source_replica = "";
@ -3541,6 +3536,8 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
String source_dir = "detached/";
std::map<String, DiskPtr> name_to_disk;
/// Let's compose a list of parts that should be added.
Strings parts;
if (attach_part)
@ -3553,29 +3550,39 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
ActiveDataPartSet active_parts(data.format_version);
std::set<String> part_names;
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
const auto disks = data.schema.getDisks();
for (const DiskPtr & disk : disks)
{
String name = it.name();
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, data.format_version))
continue;
if (part_info.partition_id != partition_id)
continue;
LOG_DEBUG(log, "Found part " << name);
active_parts.add(full_path, name);
part_names.insert(name);
const auto full_path = data.getFullPathOnDisk(disk);
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir);
it != Poco::DirectoryIterator(); ++it)
{
String name = it.name();
MergeTreePartInfo part_info;
if (!MergeTreePartInfo::tryParsePartName(name, &part_info, data.format_version))
continue;
if (part_info.partition_id != partition_id)
continue;
LOG_DEBUG(log, "Found part " << name);
active_parts.add(name);
part_names.insert(name);
name_to_disk[name] = disk;
}
}
LOG_DEBUG(log, active_parts.size() << " of them are active");
auto tmp_parts = active_parts.getParts();
for (auto & elem : tmp_parts)
parts.push_back(elem.name);
for (const auto & name : tmp_parts)
parts.push_back(name);
/// Inactive parts rename so they can not be attached in case of repeated ATTACH.
for (const auto & name : part_names)
{
String containing_part = active_parts.getContainingPart(name).name;
String containing_part = active_parts.getContainingPart(name);
if (!containing_part.empty() && containing_part != name)
{
const auto full_path = data.getFullPathOnDisk(name_to_disk[name]);
Poco::File(full_path + source_dir + name).renameTo(full_path + source_dir + "inactive_" + name);
}
}
}
@ -3585,7 +3592,7 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
for (const String & part : parts)
{
LOG_DEBUG(log, "Checking part " << part);
loaded_parts.push_back(data.loadPartAndFixMetadata(source_dir, source_dir + part));
loaded_parts.push_back(data.loadPartAndFixMetadata(name_to_disk[part], source_dir + part));
}
ReplicatedMergeTreeBlockOutputStream output(*this, 0, 0, 0, false); /// TODO Allow to use quorum here.
@ -3652,13 +3659,12 @@ void StorageReplicatedMergeTree::drop()
}
void StorageReplicatedMergeTree::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
void StorageReplicatedMergeTree::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name)
{
data.rename(new_path_to_db, new_table_name);
data.rename(new_database_name, new_table_name);
full_path = new_path_to_db + escapeForFileName(new_table_name) + '/';
table_name = new_table_name;
database_name = new_database_name;
table_name = new_table_name;
/// Update table name in zookeeper
auto zookeeper = getZooKeeper();
@ -4231,9 +4237,9 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
throw Exception("Too many retries to fetch parts from " + best_replica_path, ErrorCodes::TOO_MANY_RETRIES_TO_FETCH_PARTS);
Strings parts_tmp = getZooKeeper()->getChildren(best_replica_path + "/parts");
ActiveDataPartSet::PartPathNames parts;
Strings parts;
for (const auto & elem : parts_tmp)
parts.push_back(ActiveDataPartSet::PartPathName{"/", elem});
parts.push_back(elem);
ActiveDataPartSet active_parts_set(data.format_version, parts);
Strings parts_to_fetch;
@ -4242,7 +4248,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
{
auto tmp = active_parts_set.getParts();
for (auto elem : tmp)
parts_to_fetch.push_back(elem.name);
parts_to_fetch.push_back(elem);
/// Leaving only the parts of the desired partition.
Strings parts_to_fetch_partition;
@ -4261,7 +4267,7 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
{
for (const String & missing_part : missing_parts)
{
String containing_part = active_parts_set.getContainingPart(missing_part).name;
String containing_part = active_parts_set.getContainingPart(missing_part);
if (!containing_part.empty())
parts_to_fetch.push_back(containing_part);
else

View File

@ -192,7 +192,7 @@ public:
part_check_thread.enqueuePart(part_name, delay_to_check_seconds);
}
Strings getDataPaths() const override { return {full_path}; }
Strings getDataPaths() const override { return data.getFullPaths(); }
ASTPtr getPartitionKeyAST() const override { return data.partition_by_ast; }
ASTPtr getSortingKeyAST() const override { return data.getSortingKeyAST(); }
@ -236,7 +236,6 @@ private:
String database_name;
String table_name;
String full_path;
String zookeeper_path;
String replica_name;
@ -554,7 +553,7 @@ protected:
const String & zookeeper_path_,
const String & replica_name_,
bool attach,
const String & path_, const String & database_name_, const String & name_,
const String & database_name_, const String & name_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
Context & context_,