Stylefixes.

This commit is contained in:
Igor Mineev 2019-04-05 20:37:27 +03:00
parent e426f3bd4c
commit 968b7286e2
11 changed files with 126 additions and 110 deletions

View File

@ -1623,7 +1623,6 @@ CompressionCodecPtr Context::chooseCompressionCodec(size_t part_size, double par
}
///@TODO_IGR ASK maybe pointer to Schema?
const Schema& Context::getSchema(const String & name) const
{
auto lock = getLock();

View File

@ -423,6 +423,7 @@ public:
/// Lets you select the compression codec according to the conditions described in the configuration file.
std::shared_ptr<ICompressionCodec> chooseCompressionCodec(size_t part_size, double part_size_ratio) const;
/// Provides storage politics schemes
const Schema& getSchema(const String & name) const;
/// Get the server uptime in seconds.

View File

@ -15,7 +15,8 @@ namespace DB
class ActiveDataPartSet
{
public:
struct PartPathName {
struct PartPathName
{
/// path + name is absolute path to DataPart
String path;
String name;

View File

@ -8,7 +8,8 @@ namespace DB
std::map<String, DiskSpaceMonitor::DiskReserve> DiskSpaceMonitor::reserved;
std::mutex DiskSpaceMonitor::mutex;
DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix) {
DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
@ -17,63 +18,69 @@ DiskSelector::DiskSelector(const Poco::Util::AbstractConfiguration & config, con
{
UInt64 keep_free_space_bytes = config.getUInt64(config_prefix + "." + disk_name + ".keep_free_space_bytes", 0);
String path;
if (config.has(config_prefix + "." + disk_name + ".path")) {
if (config.has(config_prefix + "." + disk_name + ".path"))
path = config.getString(config_prefix + "." + disk_name + ".path");
}
if (disk_name == default_disk_name) {
if (!path.empty()) {
if (!path.empty())
///@TODO_IGR ASK Rename Default disk to smth? ClickHouse disk? DB disk?
throw Exception("It is not possible to specify default disk path", ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
} else {
if (path.empty()) {
}
else
{
if (path.empty())
throw Exception("Disk path can not be empty. Disk " + disk_name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
}
disks.emplace(disk_name, Disk(disk_name, path, keep_free_space_bytes));
}
}
const Disk & DiskSelector::operator[](const String & name) const {
const Disk & DiskSelector::operator[](const String & name) const
{
auto it = disks.find(name);
if (it == disks.end()) {
if (it == disks.end())
throw Exception("Unknown disk " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG);
}
return it->second;
}
bool DiskSelector::has(const String & name) const {
bool DiskSelector::has(const String & name) const
{
auto it = disks.find(name);
return it != disks.end();
}
Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disk_selector) {
void DiskSelector::add(const Disk & disk)
{
disks.emplace(disk.getName(), Disk(disk.getName(), disk.getPath(), disk.getKeepingFreeSpace()));
}
Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disk_selector)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
/// Disk's names
Strings disks_names;
for (const auto & name : keys)
{
if (startsWith(name.data(), "disk")) {
if (startsWith(name.data(), "disk"))
{
disks_names.push_back(config.getString(config_prefix + "." + name));
} else if (name == "part_size_threshold_bytes") {
max_data_part_size = config.getUInt64(config_prefix + "." + name, 0);
}
///@TODO_IGR part_size_threshold_ratio which set max_data_part_size by total disk size
else if (name == "part_size_threshold_bytes")
{
max_data_part_size = config.getUInt64(config_prefix + "." + name);
}
///@TODO_IGR ASK part_size_threshold_ratio which set max_data_part_size by total disk sizes?
}
if (max_data_part_size == 0) {
if (max_data_part_size == 0)
--max_data_part_size;
}
/// Get paths from disk's names
for (const auto & disk_name : disks_names) {
/// Disks operator [] may throw exception
/// Disks operator [] may throw exception
for (const auto & disk_name : disks_names)
disks.push_back(disk_selector[disk_name]);
}
}
Schema::Volume::Volume(const Volume & other, const String & default_path, const String & enclosed_dir)
@ -82,42 +89,48 @@ Schema::Volume::Volume(const Volume & other, const String & default_path, const
last_used(0)
{
auto dir = escapeForFileName(enclosed_dir);
for (auto & disk : disks) {
if (disk.getName() == "default") {
for (auto & disk : disks)
{
if (disk.getName() == "default")
{
disk.SetPath(default_path + dir + '/');
} else {
}
else
{
disk.addEnclosedDirToPath(dir);
}
}
}
DiskSpaceMonitor::ReservationPtr Schema::Volume::reserve(UInt64 expected_size) const {
DiskSpaceMonitor::ReservationPtr Schema::Volume::reserve(UInt64 expected_size) const
{
/// This volume can not store files which size greater than max_data_part_size
if (expected_size > max_data_part_size) {
if (expected_size > max_data_part_size)
return {};
}
/// Real order is not necessary
size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
for (size_t i = 0; i != disks.size(); ++i) {
for (size_t i = 0; i != disks.size(); ++i)
{
size_t index = (start_from + i) % disks.size();
auto reservation = DiskSpaceMonitor::tryToReserve(disks[index], expected_size);
if (reservation) {
if (reservation)
return reservation;
}
}
return {};
}
UInt64 Schema::Volume::getMaxUnreservedFreeSpace() const {
UInt64 Schema::Volume::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
for (const auto & disk : disks) {
///@TODO_IGR ASK There is cycle with mutex locking inside(((
///@TODO_IGR ASK There is cycle with mutex locking inside(((
for (const auto & disk : disks)
res = std::max(res, DiskSpaceMonitor::getUnreservedFreeSpace(disk));
}
return res;
}
Schema::Schema(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disks) {
Schema::Schema(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disks)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
@ -130,40 +143,42 @@ Schema::Schema(const Poco::Util::AbstractConfiguration & config, const std::stri
}
}
///@TODO_IGR ASK maybe iterator without copy?
Strings Schema::getFullPaths() const {
///@TODO_IGR ASK maybe iteratable object without copy?
Strings Schema::getFullPaths() const
{
Strings res;
for (const auto & volume : volumes) {
for (const auto & disk : volume.disks) {
for (const auto & volume : volumes)
for (const auto & disk : volume.disks)
res.push_back(disk.getPath());
}
}
return res;
}
UInt64 Schema::getMaxUnreservedFreeSpace() const {
UInt64 Schema::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
for (const auto & volume : volumes) {
for (const auto & volume : volumes)
res = std::max(res, volume.getMaxUnreservedFreeSpace());
}
return res;
}
DiskSpaceMonitor::ReservationPtr Schema::reserve(UInt64 expected_size) const {
for (auto & volume : volumes) {
DiskSpaceMonitor::ReservationPtr Schema::reserve(UInt64 expected_size) const
{
for (auto & volume : volumes)
{
auto reservation = volume.reserve(expected_size);
if (reservation) {
if (reservation)
return reservation;
}
}
return {};
}
SchemaSelector::SchemaSelector(const Poco::Util::AbstractConfiguration & config, String config_prefix) {
SchemaSelector::SchemaSelector(const Poco::Util::AbstractConfiguration & config, String config_prefix)
{
DiskSelector disks(config, config_prefix + ".disks");
constexpr auto default_disk_name = "default";
if (!disks.has(default_disk_name)) {
if (!disks.has(default_disk_name))
{
std::cerr << "No default disk settings" << std::endl;
disks.add(Disk(default_disk_name, "", 0));
}
@ -181,18 +196,17 @@ SchemaSelector::SchemaSelector(const Poco::Util::AbstractConfiguration & config,
}
constexpr auto default_schema_name = "default";
if (schemes.find(default_schema_name) == schemes.end()) {
if (schemes.find(default_schema_name) == schemes.end())
schemes.emplace(default_schema_name, Schema(Schema::Volumes{Schema::Volume::Disks{disks[default_disk_name]}}));
}
std::cerr << schemes.size() << " schemes loaded" << std::endl; ///@TODO_IGR ASK logs?
}
const Schema & SchemaSelector::operator[](const String & name) const {
const Schema & SchemaSelector::operator[](const String & name) const
{
auto it = schemes.find(name);
if (it == schemes.end()) {
if (it == schemes.end())
throw Exception("Unknown schema " + name, ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG); ///@TODO_IGR Choose error code
}
return it->second;
}

View File

@ -25,6 +25,7 @@ namespace ErrorCodes
extern const int CANNOT_STATVFS;
extern const int NOT_ENOUGH_SPACE;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
}
/** path - Contain path to data on disk.
@ -42,23 +43,28 @@ public:
{
}
const String & getName() const {
const String & getName() const
{
return name;
}
const String & getPath() const {
const String & getPath() const
{
return path;
}
UInt64 getKeepingFreeSpace() const {
UInt64 getKeepingFreeSpace() const
{
return keep_free_space_bytes;
}
void addEnclosedDirToPath(const String & dir) {
void addEnclosedDirToPath(const String & dir)
{
path += dir + '/';
}
void SetPath(const String & path_) {
void SetPath(const String & path_)
{
path = path_;
}
@ -79,7 +85,8 @@ private:
class DiskSpaceMonitor
{
public:
struct DiskReserve {
struct DiskReserve
{
UInt64 reserved_bytes;
UInt64 reservation_count;
};
@ -226,9 +233,7 @@ public:
bool has(const String & name) const;
void add(const Disk & disk) {
disks.emplace(disk.getName(), Disk(disk.getName(), disk.getPath(), disk.getKeepingFreeSpace()));
}
void add(const Disk & disk);
private:
std::map<String, Disk> disks;
@ -239,7 +244,8 @@ class Schema
{
public:
class Volume {
class Volume
{
friend class Schema;
public:
@ -293,7 +299,8 @@ public:
Schema(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, const DiskSelector & disks);
Schema(const Schema & other, const String & default_path, const String & enclosed_dir) {
Schema(const Schema & other, const String & default_path, const String & enclosed_dir)
{
for (const auto & volume : other.volumes) {
volumes.push_back(Volume(volume, default_path, enclosed_dir));
}
@ -309,7 +316,8 @@ private:
Volumes volumes;
};
class SchemaSelector {
class SchemaSelector
{
public:
SchemaSelector(const Poco::Util::AbstractConfiguration & config, String config_prefix);

View File

@ -165,9 +165,9 @@ MergeTreeData::MergeTreeData(
/// Creating directories, if not exist.
Poco::File(full_path).createDirectories();
for (const String & path : getFullPaths()) {
for (const String & path : getFullPaths())
{
Poco::File(path).createDirectories();
Poco::File(path + "detached").createDirectory();
}
@ -858,13 +858,16 @@ void MergeTreeData::clearOldTemporaryDirectories(ssize_t custom_directories_life
{
Poco::File tmp_dir(full_path + it.name());
try {
if (tmp_dir.isDirectory() && isOldPartDirectory(tmp_dir, deadline)) {
try
{
if (tmp_dir.isDirectory() && isOldPartDirectory(tmp_dir, deadline))
{
LOG_WARNING(log, "Removing temporary directory " << full_path << it.name());
Poco::File(full_path + it.name()).remove(true);
}
}
catch (const Poco::FileNotFoundException &) {
catch (const Poco::FileNotFoundException &)
{
/// If the file is already deleted, do nothing.
}
}
@ -1014,9 +1017,8 @@ void MergeTreeData::dropAllData()
LOG_TRACE(log, "dropAllData: removing data from filesystem.");
for (auto && full_path : getFullPaths()) {
for (auto && full_path : getFullPaths())
Poco::File(full_path).remove(true);
}
LOG_TRACE(log, "dropAllData: done.");
}
@ -2294,7 +2296,8 @@ size_t MergeTreeData::getPartitionSize(const std::string & partition_id) const
continue;
const auto part_path = it.path().absolute().toString();
for (Poco::DirectoryIterator it2(part_path); it2 != end; ++it2) {
for (Poco::DirectoryIterator it2(part_path); it2 != end; ++it2)
{
const auto part_file_path = it2.path().absolute().toString();
size += Poco::File(part_file_path).getSize();
}
@ -2429,13 +2432,12 @@ DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceForPart(UInt64 expec
constexpr UInt64 SIZE_1MB = 1ull << 20; ///@TODO_IGR ASK Is it OK?
constexpr UInt64 MAGIC_CONST = 1;
if (expected_size < SIZE_1MB) {
if (expected_size < SIZE_1MB)
expected_size = SIZE_1MB;
}
auto reservation = reserveSpaceAtDisk(expected_size * MAGIC_CONST);
if (reservation) {
if (reservation)
return reservation;
}
throw Exception("Not enough free disk space to reserve: " + formatReadableSizeWithBinarySuffix(expected_size) + " requested, "
+ formatReadableSizeWithBinarySuffix(schema.getMaxUnreservedFreeSpace()) + " available",
@ -2647,7 +2649,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPart(const Merg
return dst_data_part;
}
DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceAtDisk(UInt64 expected_size) const {
DiskSpaceMonitor::ReservationPtr MergeTreeData::reserveSpaceAtDisk(UInt64 expected_size) const
{
return schema.reserve(expected_size);
}

View File

@ -572,9 +572,7 @@ public:
DiskSpaceMonitor::ReservationPtr reserveSpaceAtDisk(UInt64 expected_size) const; ///@TODO_IGR ASK Maybe set this method as private?
Strings getFullPaths() const {
return schema.getFullPaths();
}
Strings getFullPaths() const { return schema.getFullPaths(); }
MergeTreeDataFormatVersion format_version;

View File

@ -1627,9 +1627,9 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
auto tmp = queue.virtual_parts.getPartsCoveredBy(gap_part_info);
Strings covered;
for (auto & elem : tmp) {
for (auto & elem : tmp)
covered.push_back(elem.name);
}
if (!covered.empty())
{
if (out_reason)

View File

@ -273,9 +273,8 @@ public:
{
/// Assume mutex is already locked, because this method is called from mergeTask.
reserved_space = storage.data.reserveSpaceForPart(total_size);
if (!reserved_space) {
if (!reserved_space)
throw Exception("Not enought space", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR Edit exception msg
}
for (const auto & part : future_part.parts)
{
@ -962,16 +961,16 @@ void StorageMergeTree::attachPartition(const ASTPtr & partition, bool attach_par
ActiveDataPartSet::PartPathNames parts;
if (attach_part)
{
for (const String & full_path : full_paths) {
for (const String & full_path : full_paths)
parts.push_back(ActiveDataPartSet::PartPathName{full_path, partition_id});
}
}
else
{
LOG_DEBUG(log, "Looking for parts for partition " << partition_id << " in " << source_dir);
///@TODO_IGR ASK ActiveDataPartSet without path? Is it possible here?
ActiveDataPartSet active_parts(data.format_version);
for (const String & full_path : full_paths) {
for (const String & full_path : full_paths)
{
for (Poco::DirectoryIterator it = Poco::DirectoryIterator(full_path + source_dir); it != Poco::DirectoryIterator(); ++it)
{
const String & name = it.name();

View File

@ -1046,9 +1046,8 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
/// Can throw an exception.
DiskSpaceMonitor::ReservationPtr reserved_space = data.reserveSpaceForPart(estimated_space_for_merge);
if (!reserved_space) {
if (!reserved_space)
throw Exception("TMP MSG", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR FIX
}
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
@ -1179,9 +1178,8 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
/// Can throw an exception.
DiskSpaceMonitor::ReservationPtr reserved_space = data.reserveSpaceForPart(estimated_space_for_result);
if (!reserved_space) {
if (!reserved_space)
throw Exception("TMP MSG", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR FIX
}
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
@ -1933,9 +1931,8 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
/// Add to the queue jobs to receive all the active parts that the reference/master replica has.
Strings parts_tmp = zookeeper->getChildren(source_path + "/parts");
ActiveDataPartSet::PartPathNames parts;
for (const auto & elem : parts_tmp) {
for (const auto & elem : parts_tmp)
parts.push_back(ActiveDataPartSet::PartPathName{"/", elem});
}
ActiveDataPartSet active_parts_set(data.format_version, parts);
@ -3570,9 +3567,8 @@ void StorageReplicatedMergeTree::attachPartition(const ASTPtr & partition, bool
}
LOG_DEBUG(log, active_parts.size() << " of them are active");
auto tmp_parts = active_parts.getParts();
for (auto & elem : tmp_parts) {
for (auto & elem : tmp_parts)
parts.push_back(elem.name);
}
/// Inactive parts rename so they can not be attached in case of repeated ATTACH.
for (const auto & name : part_names)
@ -4238,18 +4234,17 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
Strings parts_tmp = getZooKeeper()->getChildren(best_replica_path + "/parts");
ActiveDataPartSet::PartPathNames parts;
for (const auto & elem : parts_tmp) {
for (const auto & elem : parts_tmp)
parts.push_back(ActiveDataPartSet::PartPathName{"/", elem});
}
ActiveDataPartSet active_parts_set(data.format_version, parts);
Strings parts_to_fetch;
if (missing_parts.empty())
{
auto tmp = active_parts_set.getParts();
for (auto elem : tmp) {
for (auto elem : tmp)
parts_to_fetch.push_back(elem.name);
}
/// Leaving only the parts of the desired partition.
Strings parts_to_fetch_partition;

View File

@ -195,11 +195,9 @@ protected:
if (columns_mask[src_index++])
res_columns[res_index++]->insert(0u); // is_temporary
if (columns_mask[src_index++]) {
for (const String & path : tables_it->table()->getDataPaths() ) {
if (columns_mask[src_index++])
for (const String & path : tables_it->table()->getDataPaths())
res_columns[res_index++]->insert(path);
}
}
if (columns_mask[src_index++])
res_columns[res_index++]->insert(database->getTableMetadataPath(table_name));