Minor fixes.

This commit is contained in:
Igor Mineev 2019-05-13 23:58:22 +03:00
parent eac8c8c0a4
commit 74466de6d8
7 changed files with 56 additions and 25 deletions

View File

@ -253,13 +253,13 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
});
PooledReadWriteBufferFromHTTP in{
uri_v0,
Poco::Net::HTTPRequest::HTTP_POST,
{},
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
data.settings.replicated_max_parallel_fetches_for_host
uri_v0,
Poco::Net::HTTPRequest::HTTP_POST,
{},
timeouts,
creds,
DBMS_DEFAULT_BUFFER_SIZE,
data.settings.replicated_max_parallel_fetches_for_host
};
/// We don't know real size of part

View File

@ -89,22 +89,32 @@ Schema::Volume::Volume(const Poco::Util::AbstractConfiguration & config, const s
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
Logger * logger = &Logger::get("StorageConfiguration");
if (has_max_bytes)
{
max_data_part_size = config.getUInt64(config_prefix + ".max_data_part_size_bytes");
}
else if (has_max_ratio)
{
auto ratio = config.getDouble(config_prefix + ".max_data_part_size_bytes");
if (ratio < 0 and ratio > 1)
{
throw Exception("'max_data_part_size_bytes' have to be between 0 and 1",
auto ratio = config.getDouble(config_prefix + ".max_data_part_size_ratio");
if (ratio < 0)
throw Exception("'max_data_part_size_ratio' have to be not less then 0",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
UInt64 sum_size = 0;
std::vector<UInt64> sizes;
for (const auto & disk : disks)
sum_size += disk->getTotalSpace();
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio);
{
sizes.push_back(disk->getTotalSpace());
sum_size += sizes.back();
}
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio / disks.size());
for (size_t i = 0; i != disks.size(); ++i)
if (sizes[i] < max_data_part_size)
LOG_WARNING(logger, "Disk " << disks[i]->getName() << " on volume " << config_prefix <<
" have not enough space (" << sizes[i] <<
") for containing part the size of max_data_part_size (" <<
max_data_part_size << ")");
}
else
{
@ -164,6 +174,13 @@ Schema::Disks Schema::getDisks() const
return res;
}
DiskPtr Schema::getAnyDisk() const
{
/// Schema must contain at least one Volume
/// Volume must contain at least one Disk
return volumes[0].disks[0];
}
UInt64 Schema::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
@ -207,12 +224,14 @@ SchemaSelector::SchemaSelector(const Poco::Util::AbstractConfiguration & config,
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix, keys);
Logger * logger = &Logger::get("SchemaSelector");
for (const auto & name : keys)
{
if (!std::all_of(name.begin(), name.end(), isWordCharASCII))
throw Exception("Schema name can contain only alphanumeric and '_' (" + name + ")", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
schemas.emplace(name, Schema{config, config_prefix + "." + name, disks});
LOG_INFO(&Logger::get("StatusFile"), "Storage schema " << name << "loaded"); ///@TODO_IGR ASK Logger?
LOG_INFO(logger, "Storage schema " << name << " Sloaded");
}
constexpr auto default_schema_name = "default";

View File

@ -67,6 +67,9 @@ public:
throwFromErrno("Could not calculate available disk space (statvfs)", ErrorCodes::CANNOT_STATVFS);
UInt64 size = fs.f_blocks * fs.f_bsize;
size -= std::min(size, keep_free_space_bytes);
return size;
}
@ -157,7 +160,7 @@ public:
return size;
}
const DiskPtr & getDisk() const ///@TODO_IGR rename
const DiskPtr & getDisk() const
{
return disk_ptr;
}
@ -165,9 +168,15 @@ public:
Reservation(UInt64 size_, DiskPtr disk_ptr_)
: size(size_), metric_increment(CurrentMetrics::DiskSpaceReservedForMerge, size), disk_ptr(std::move(disk_ptr_)) ///@TODO_IGR ASK DiskSpaceReservedForMerge?
{
/// Just make reservation if size is 0
if (size == 0) {
++reserves->reservation_count;
valid = true;
return;
}
auto unreserved = disk_ptr->getAvailableSpace();
LOG_INFO(&Logger::get("StatusFile"), "Reservation try: Unreserved " << unreserved << " ,size " << size);
LOG_DEBUG(&Logger::get("DiskSpaceMonitor"), "Unreserved " << unreserved << " , to reserve " << size);
std::lock_guard lock(DiskSpaceMonitor::mutex);
@ -182,11 +191,11 @@ public:
reserves = &DiskSpaceMonitor::reserved[disk_ptr->getName()];
reserves->reserved_bytes += size;
++reserves->reservation_count;
valid = true;
}
/// Reservation valid when reserves not less then 1 byte
explicit operator bool() const noexcept {
return size != 0;
return valid;
}
private:
@ -194,6 +203,7 @@ public:
CurrentMetrics::Increment metric_increment;
DiskReserve * reserves;
DiskPtr disk_ptr;
bool valid = false;
};
using ReservationPtr = std::unique_ptr<Reservation>;
@ -320,6 +330,8 @@ public:
Disks getDisks() const;
DiskPtr getAnyDisk() const;
UInt64 getMaxUnreservedFreeSpace() const;
DiskSpaceMonitor::ReservationPtr reserve(UInt64 expected_size) const;

View File

@ -187,7 +187,7 @@ MergeTreeData::MergeTreeData(
/// If not choose any
if (version_file_path.empty())
version_file_path = schema.getDisks()[0]->getPath() + "format_version.txt";
version_file_path = getFullPathOnDisk(schema.getAnyDisk()) + "format_version.txt";
///@TODO_IGR ASK LOGIC
auto version_file_exists = Poco::File(version_file_path).exists();

View File

@ -479,7 +479,6 @@ void MergeTreeDataPart::makeCloneInDetached(const String & prefix) const
{
Poco::Path src(getFullPath());
Poco::Path dst(storage.getFullPathOnDisk(disk) + getRelativePathForDetachedPart(prefix));
///@TODO_IGR ASK What about another path?
/// Backup is not recursive (max_level is 0), so do not copy inner directories
localBackup(src, dst, 0);
}

View File

@ -198,8 +198,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
else
part_name = new_part_info.getPartName();
/// Size of part would not be grater than block.bytes() + epsilon
size_t expected_size = block.bytes();
auto reservation = data.reserveSpaceForPart(expected_size); ///@TODO_IGR ASK expected size
auto reservation = data.reserveSpaceForPart(expected_size);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(data,
reservation->getDisk(), part_name, new_part_info);

View File

@ -272,9 +272,10 @@ public:
: future_part(future_part_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
/// @TODO_IGR BUG Fix here. When mutation use old path!!!
reserved_space = storage.reserveSpaceForPart(total_size);
if (!reserved_space)
throw Exception("Not enought space", ErrorCodes::NOT_ENOUGH_SPACE); ///@TODO_IGR Edit exception msg
throw Exception("Not enough space for merging parts", ErrorCodes::NOT_ENOUGH_SPACE);
for (const auto & part : future_part.parts)
{
@ -334,9 +335,8 @@ public:
void StorageMergeTree::mutate(const MutationCommands & commands, const Context &)
{
///@TODO_IGR ASK What should i do here?
/// Choose any disk.
auto disk = schema.getDisks()[0];
auto disk = schema.getAnyDisk();
MergeTreeMutationEntry entry(commands, getFullPathOnDisk(disk), insert_increment.get());
String file_name;
{