More constants, better tests

This commit is contained in:
alesapin 2019-09-09 16:50:19 +03:00
parent 2288d25972
commit 9f88baebb9
6 changed files with 215 additions and 112 deletions

View File

@ -133,7 +133,7 @@ Reservation::~Reservation()
if (disk_ptr->reserved_bytes < size)
{
disk_ptr->reserved_bytes = 0;
LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservations size. It's a bug.");
LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservations size for disk '" + disk_ptr->getName() + "'.");
}
else
{
@ -141,7 +141,7 @@ Reservation::~Reservation()
}
if (disk_ptr->reservation_count == 0)
LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservation count. It's a bug.");
LOG_ERROR(&Logger::get("DiskSpaceMonitor"), "Unbalanced reservation count for disk '" + disk_ptr->getName() + "'.");
else
--disk_ptr->reservation_count;
}
@ -276,7 +276,7 @@ Volume::Volume(
if (has_max_bytes)
{
max_data_part_size = config.getUInt64(config_prefix + ".max_data_part_size_bytes");
max_data_part_size = config.getUInt64(config_prefix + ".max_data_part_size_bytes", 0);
}
else if (has_max_ratio)
{
@ -299,10 +299,6 @@ Volume::Volume(
") for containing part the size of max_data_part_size (" <<
max_data_part_size << ")");
}
else
{
max_data_part_size = UNLIMITED_PARTITION_SIZE;
}
constexpr UInt64 MIN_PART_SIZE = 8u * 1024u * 1024u;
if (max_data_part_size < MIN_PART_SIZE)
LOG_WARNING(logger, "Volume '" << name << "' max_data_part_size is too low ("
@ -315,7 +311,7 @@ ReservationPtr Volume::reserve(UInt64 expected_size) const
{
/// This volume can not store files which size greater than max_data_part_size
if (expected_size > max_data_part_size)
if (max_data_part_size != 0 && expected_size > max_data_part_size)
return {};
size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
@ -532,7 +528,8 @@ StoragePolicySelector::StoragePolicySelector(
auto default_volume = std::make_shared<Volume>(
default_volume_name,
std::vector<DiskPtr>{disks[default_disk_name]},
UNLIMITED_PARTITION_SIZE);
0);
auto default_policy = std::make_shared<StoragePolicy>(default_storage_policy_name, Volumes{default_volume}, 0.0);
policies.emplace(default_storage_policy_name, default_policy);
}

View File

@ -43,7 +43,6 @@ namespace ErrorCodes
namespace DiskSpace
{
static constexpr UInt64 UNLIMITED_PARTITION_SIZE = std::numeric_limits<UInt64>::max();
class Reservation;
using ReservationPtr = std::unique_ptr<Reservation>;
@ -118,10 +117,9 @@ public:
throw Exception("Disk path must ends with '/', but '" + path + "' doesn't.", ErrorCodes::LOGICAL_ERROR);
}
/// Reserves bytes on disk, if not possible returns nullptr.
ReservationPtr reserve(UInt64 bytes) const override;
bool tryReserve(UInt64 bytes) const;
const String & getName() const override { return name; }
const String & getPath() const { return path; }
@ -137,14 +135,18 @@ public:
UInt64 getUnreservedSpace() const;
private:
String name;
String path;
UInt64 keep_free_space_bytes;
const String name;
const String path;
const UInt64 keep_free_space_bytes;
/// Real reservation data
static std::mutex mutex;
mutable UInt64 reserved_bytes = 0;
mutable UInt64 reservation_count = 0;
private:
/// Reserves bytes on disk, if not possible returns false
bool tryReserve(UInt64 bytes) const;
};
/// It is not possible to change disk runtime.
@ -154,7 +156,7 @@ using Disks = std::vector<DiskPtr>;
/**
* Information about reserved size on concrete disk.
* Unreserve on destroy.
* Unreserve on destroy. Doesn't reserve bytes in constructor.
*/
class Reservation final : private boost::noncopyable
{
@ -182,7 +184,7 @@ private:
};
/// Parse .xml configuration and store information about disks
/// Mostly used for introspection
/// Mostly used for introspection.
class DiskSelector
{
public:
@ -231,13 +233,13 @@ public:
const String & getName() const override { return name; }
UInt64 max_data_part_size = UNLIMITED_PARTITION_SIZE;
UInt64 max_data_part_size = 0;
Disks disks;
private:
mutable std::atomic<size_t> last_used = 0;
String name;
const String name;
};
using VolumePtr = std::shared_ptr<const Volume>;
@ -277,7 +279,7 @@ public:
size_t getVolumePriorityByDisk(const DiskPtr & disk_ptr) const;
/// Reserves 0 bytes on disk with max available space
/// Do not use this function when it is possible to predict size!!!
/// Do not use this function when it is possible to predict size.
ReservationPtr makeEmptyReservationOnLargestDisk() const;
const Volumes & getVolumes() const { return volumes; }
@ -296,7 +298,7 @@ public:
private:
Volumes volumes;
String name;
const String name;
std::map<String, size_t> volumes_names;
/// move_factor from interval [0., 1.]

View File

@ -529,7 +529,7 @@ public:
/// parts should be sorted.
MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
const FutureMergedMutatedPart & future_part, MergeList::Entry & merge_entry, TableStructureReadLockHolder &,
time_t time_of_merge, DiskSpace::Reservation * disk_reservation, bool deduplicate, bool force_ttl)
time_t time_of_merge, DiskSpace::Reservation * space_reservation, bool deduplicate, bool force_ttl)
{
static const String TMP_PREFIX = "tmp_merge_";
@ -542,7 +542,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
<< parts.front()->name << " to " << parts.back()->name
<< " into " << TMP_PREFIX + future_part.name);
String part_path = data.getFullPathOnDisk(disk_reservation->getDisk());
String part_path = data.getFullPathOnDisk(space_reservation->getDisk());
String new_part_tmp_path = part_path + TMP_PREFIX + future_part.name + "/";
if (Poco::File(new_part_tmp_path).exists())
throw Exception("Directory " + new_part_tmp_path + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
@ -563,7 +563,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
data.merging_params, gathering_columns, gathering_column_names, merging_columns, merging_column_names);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, disk_reservation->getDisk(), future_part.name, future_part.part_info);
data, space_reservation->getDisk(), future_part.name, future_part.part_info);
new_data_part->partition.assign(future_part.getPartition());
new_data_part->relative_path = TMP_PREFIX + future_part.name;
new_data_part->is_temp = true;
@ -737,7 +737,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
to.writePrefix();
size_t rows_written = 0;
const size_t initial_reservation = disk_reservation ? disk_reservation->getSize() : 0;
const size_t initial_reservation = space_reservation ? space_reservation->getSize() : 0;
auto is_cancelled = [&]() { return merges_blocker.isCancelled()
|| (need_remove_expired_values && ttl_merges_blocker.isCancelled()); };
@ -753,7 +753,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
merge_entry->bytes_written_uncompressed = merged_stream->getProfileInfo().bytes;
/// Reservation updates is not performed yet, during the merge it may lead to higher free space requirements
if (disk_reservation && sum_input_rows_upper_bound)
if (space_reservation && sum_input_rows_upper_bound)
{
/// The same progress from merge_entry could be used for both algorithms (it should be more accurate)
/// But now we are using inaccurate row-based estimation in Horizontal case for backward compatibility
@ -761,7 +761,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
? std::min(1., 1. * rows_written / sum_input_rows_upper_bound)
: std::min(1., merge_entry->progress.load(std::memory_order_relaxed));
disk_reservation->update(static_cast<size_t>((1. - progress) * initial_reservation));
space_reservation->update(static_cast<size_t>((1. - progress) * initial_reservation));
}
}
merged_stream->readSuffix();
@ -898,7 +898,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
const std::vector<MutationCommand> & commands,
MergeListEntry & merge_entry,
const Context & context,
DiskSpace::Reservation * disk_reservation,
DiskSpace::Reservation * space_reservation,
TableStructureReadLockHolder & table_lock_holder)
{
auto check_not_cancelled = [&]()
@ -945,7 +945,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
LOG_TRACE(log, "Mutating part " << source_part->name << " to mutation version " << future_part.part_info.mutation);
MergeTreeData::MutableDataPartPtr new_data_part = std::make_shared<MergeTreeData::DataPart>(
data, disk_reservation->getDisk(), future_part.name, future_part.part_info);
data, space_reservation->getDisk(), future_part.name, future_part.part_info);
new_data_part->relative_path = "tmp_mut_" + future_part.name;
new_data_part->is_temp = true;
new_data_part->ttl_infos = source_part->ttl_infos;

View File

@ -32,36 +32,30 @@ BlockInputStreams StorageSystemDisks::read(
{
check(column_names);
MutableColumnPtr col_name_mut = ColumnString::create();
MutableColumnPtr col_path_mut = ColumnString::create();
MutableColumnPtr col_free_mut = ColumnUInt64::create();
MutableColumnPtr col_total_mut = ColumnUInt64::create();
MutableColumnPtr col_keep_mut = ColumnUInt64::create();
MutableColumnPtr col_name = ColumnString::create();
MutableColumnPtr col_path = ColumnString::create();
MutableColumnPtr col_free = ColumnUInt64::create();
MutableColumnPtr col_total = ColumnUInt64::create();
MutableColumnPtr col_keep = ColumnUInt64::create();
const auto & disk_selector = context.getDiskSelector();
for (const auto & [disk_name, disk_ptr] : disk_selector.getDisksMap())
{
col_name_mut->insert(disk_name);
col_path_mut->insert(disk_ptr->getPath());
col_free_mut->insert(disk_ptr->getAvailableSpace());
col_total_mut->insert(disk_ptr->getTotalSpace());
col_keep_mut->insert(disk_ptr->getKeepingFreeSpace());
col_name->insert(disk_name);
col_path->insert(disk_ptr->getPath());
col_free->insert(disk_ptr->getAvailableSpace());
col_total->insert(disk_ptr->getTotalSpace());
col_keep->insert(disk_ptr->getKeepingFreeSpace());
}
ColumnPtr col_name = std::move(col_name_mut);
ColumnPtr col_path = std::move(col_path_mut);
ColumnPtr col_free = std::move(col_free_mut);
ColumnPtr col_total = std::move(col_total_mut);
ColumnPtr col_keep = std::move(col_keep_mut);
Block res = getSampleBlock().cloneEmpty();
size_t col_num = 0;
res.getByPosition(col_num++).column = col_name;
res.getByPosition(col_num++).column = col_path;
res.getByPosition(col_num++).column = col_free;
res.getByPosition(col_num++).column = col_total;
res.getByPosition(col_num++).column = col_keep;
res.getByPosition(col_num++).column = std::move(col_name);
res.getByPosition(col_num++).column = std::move(col_path);
res.getByPosition(col_num++).column = std::move(col_free);
res.getByPosition(col_num++).column = std::move(col_total);
res.getByPosition(col_num++).column = std::move(col_keep);
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(res));
}

View File

@ -15,13 +15,14 @@ namespace ErrorCodes
StorageSystemStoragePolicies::StorageSystemStoragePolicies(const std::string & name_)
: name(name_)
{
setColumns(ColumnsDescription(
{
{"policy_name", std::make_shared<DataTypeString>()},
{"volume_name", std::make_shared<DataTypeString>()},
{"volume_priority", std::make_shared<DataTypeUInt64>()},
{"disks", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"max_data_part_size", std::make_shared<DataTypeUInt64>()},
setColumns(
ColumnsDescription({
{"policy_name", std::make_shared<DataTypeString>()},
{"volume_name", std::make_shared<DataTypeString>()},
{"volume_priority", std::make_shared<DataTypeUInt64>()},
{"disks", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"max_data_part_size", std::make_shared<DataTypeUInt64>()},
{"move_factor", std::make_shared<DataTypeFloat32>()}
}));
}
@ -35,11 +36,12 @@ BlockInputStreams StorageSystemStoragePolicies::read(
{
check(column_names);
MutableColumnPtr col_policy_name_mut = ColumnString::create();
MutableColumnPtr col_volume_name_mut = ColumnString::create();
MutableColumnPtr col_priority_mut = ColumnUInt64::create();
MutableColumnPtr col_disks_mut = ColumnArray::create(ColumnString::create());
MutableColumnPtr col_max_part_size_mut = ColumnUInt64::create();
MutableColumnPtr col_policy_name = ColumnString::create();
MutableColumnPtr col_volume_name = ColumnString::create();
MutableColumnPtr col_priority = ColumnUInt64::create();
MutableColumnPtr col_disks = ColumnArray::create(ColumnString::create());
MutableColumnPtr col_max_part_size = ColumnUInt64::create();
MutableColumnPtr col_move_factor = ColumnFloat32::create();
const auto & policy_selector = context.getStoragePolicySelector();
@ -48,31 +50,29 @@ BlockInputStreams StorageSystemStoragePolicies::read(
const auto & volumes = policy_ptr->getVolumes();
for (size_t i = 0; i != volumes.size(); ++i)
{
col_policy_name_mut->insert(policy_name);
col_volume_name_mut->insert(volumes[i]->getName());
col_priority_mut->insert(i);
col_policy_name->insert(policy_name);
col_volume_name->insert(volumes[i]->getName());
col_priority->insert(i + 1);
Array disks;
disks.reserve(volumes[i]->disks.size());
for (const auto & disk_ptr : volumes[i]->disks)
disks.push_back(disk_ptr->getName());
col_disks_mut->insert(disks);
col_max_part_size_mut->insert(volumes[i]->max_data_part_size);
col_disks->insert(disks);
col_max_part_size->insert(volumes[i]->max_data_part_size);
col_move_factor->insert(policy_ptr->getMoveFactor());
}
}
ColumnPtr col_policy_name = std::move(col_policy_name_mut);
ColumnPtr col_volume_name = std::move(col_volume_name_mut);
ColumnPtr col_priority = std::move(col_priority_mut);
ColumnPtr col_disks = std::move(col_disks_mut);
ColumnPtr col_max_part_size = std::move(col_max_part_size_mut);
Block res = getSampleBlock().cloneEmpty();
size_t col_num = 0;
res.getByPosition(col_num++).column = col_policy_name;
res.getByPosition(col_num++).column = col_volume_name;
res.getByPosition(col_num++).column = col_priority;
res.getByPosition(col_num++).column = col_disks;
res.getByPosition(col_num++).column = col_max_part_size;
res.getByPosition(col_num++).column = std::move(col_policy_name);
res.getByPosition(col_num++).column = std::move(col_volume_name);
res.getByPosition(col_num++).column = std::move(col_priority);
res.getByPosition(col_num++).column = std::move(col_disks);
res.getByPosition(col_num++).column = std::move(col_max_part_size);
res.getByPosition(col_num++).column = std::move(col_move_factor);
return BlockInputStreams(1, std::make_shared<OneBlockInputStream>(res));
}

View File

@ -2,9 +2,11 @@ import time
import pytest
import random
import string
import json
from multiprocessing.dummy import Pool
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
@ -33,50 +35,158 @@ def start_cluster():
cluster.shutdown()
def test_system_tables(start_cluster):
expected_disks_data = [
{
"name": "default",
"path": "/var/lib/clickhouse/data/",
"keep_free_space": '1024',
},
{
"name": "jbod1",
"path": "/jbod1/",
"keep_free_space": '0',
},
{
"name": "jbod2",
"path": "/jbod2/",
"keep_free_space": '10485760',
},
{
"name": "external",
"path": "/external/",
"keep_free_space": '0',
}
]
click_disk_data = json.loads(node1.query("SELECT name, path, keep_free_space FROM system.disks FORMAT JSON"))["data"]
assert sorted(click_disk_data, key=lambda x: x["name"]) == sorted(expected_disks_data, key=lambda x: x["name"])
expected_policies_data = [
{
"policy_name": "small_jbod_with_external",
"volume_name": "main",
"volume_priority": "1",
"disks": ["jbod1"],
"max_data_part_size": "0",
"move_factor": 0.1,
},
{
"policy_name": "small_jbod_with_external",
"volume_name": "external",
"volume_priority": "2",
"disks": ["external"],
"max_data_part_size": "0",
"move_factor": 0.1,
},
{
"policy_name": "jbods_with_external",
"volume_name": "main",
"volume_priority": "1",
"disks": ["jbod1", "jbod2"],
"max_data_part_size": "10485760",
"move_factor": 0.1,
},
{
"policy_name": "jbods_with_external",
"volume_name": "external",
"volume_priority": "2",
"disks": ["external"],
"max_data_part_size": "0",
"move_factor": 0.1,
},
{
"policy_name": "moving_jbod_with_external",
"volume_name": "main",
"volume_priority": "1",
"disks": ["jbod1"],
"max_data_part_size": "0",
"move_factor": 0.7,
},
{
"policy_name": "moving_jbod_with_external",
"volume_name": "external",
"volume_priority": "2",
"disks": ["external"],
"max_data_part_size": "0",
"move_factor": 0.7,
},
{
"policy_name": "default_disk_with_external",
"volume_name": "small",
"volume_priority": "1",
"disks": ["default"],
"max_data_part_size": "2097152",
"move_factor": 0.1,
},
{
"policy_name": "default_disk_with_external",
"volume_name": "big",
"volume_priority": "2",
"disks": ["external"],
"max_data_part_size": "20971520",
"move_factor": 0.1,
},
]
clickhouse_policies_data = json.loads(node1.query("SELECT * FROM system.storage_policies WHERE policy_name != 'default' FORMAT JSON"))["data"]
def key(x):
return (x["policy_name"], x["volume_name"], x["volume_priority"])
assert sorted(clickhouse_policies_data, key=key) == sorted(expected_policies_data, key=key)
def test_query_parser(start_cluster):
with pytest.raises(QueryRuntimeException):
try:
with pytest.raises(QueryRuntimeException):
node1.query("""
CREATE TABLE table_with_absent_policy (
d UInt64
) ENGINE = MergeTree()
ORDER BY d
SETTINGS storage_policy_name='very_exciting_policy'
""")
with pytest.raises(QueryRuntimeException):
node1.query("""
CREATE TABLE table_with_absent_policy (
d UInt64
) ENGINE = MergeTree()
ORDER BY d
SETTINGS storage_policy_name='jbod1'
""")
node1.query("""
CREATE TABLE table_with_absent_policy (
d UInt64
) ENGINE = MergeTree()
ORDER BY d
SETTINGS storage_policy_name='very_exciting_policy'
CREATE TABLE table_with_normal_policy (
d UInt64
) ENGINE = MergeTree()
ORDER BY d
SETTINGS storage_policy_name='default'
""")
with pytest.raises(QueryRuntimeException):
node1.query("""
CREATE TABLE table_with_absent_policy (
d UInt64
) ENGINE = MergeTree()
ORDER BY d
SETTINGS storage_policy_name='jbod1'
""")
node1.query("INSERT INTO table_with_normal_policy VALUES (5)")
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE table_with_normal_policy MOVE PARTITION 'all' TO VOLUME 'some_volume'")
node1.query("""
CREATE TABLE table_with_normal_policy (
d UInt64
) ENGINE = MergeTree()
ORDER BY d
SETTINGS storage_policy_name='default'
""")
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE table_with_normal_policy MOVE PARTITION 'all' TO DISK 'some_volume'")
node1.query("INSERT INTO table_with_normal_policy VALUES (5)")
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE table_with_normal_policy MOVE PART 'xxxxx' TO DISK 'jbod1'")
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE table_with_normal_policy MOVE PARTITION 'all' TO VOLUME 'some_volume'")
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE table_with_normal_policy MOVE PARTITION 'yyyy' TO DISK 'jbod1'")
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE table_with_normal_policy MOVE PARTITION 'all' TO DISK 'some_volume'")
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE table_with_normal_policy MOVE PART 'xxxxx' TO DISK 'jbod1'")
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE table_with_normal_policy MOVE PARTITION 'yyyy' TO DISK 'jbod1'")
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE table_with_normal_policy MODIFY SETTING storage_policy_name='moving_jbod_with_external'")
with pytest.raises(QueryRuntimeException):
node1.query("ALTER TABLE table_with_normal_policy MODIFY SETTING storage_policy_name='moving_jbod_with_external'")
finally:
node1.query("DROP TABLE IF EXISTS table_with_normal_policy")
def get_random_string(length):