Ready for review

This commit is contained in:
Igr Mineev 2019-08-01 13:29:14 +03:00
parent 7f7b47fc0a
commit fc79ed86ef
11 changed files with 76 additions and 214 deletions

View File

@ -262,9 +262,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
Poco::File(path + "data/" + default_database).createDirectories();
Poco::File(path + "metadata/" + default_database).createDirectories();
/// Check that we have write access to all data paths
/// Check that we have read and write access to all data paths
auto disk_selector = global_context->getDiskSelector();
for (auto [name, disk] : disk_selector.getDisksMap()) {
for (auto [name, disk] : disk_selector.getDisksMap())
{
Poco::File disk_path(disk->getPath());
if (!disk_path.canRead() or !disk_path.canWrite())
throw Exception("There is no RW access to disk " + name + " (" + disk->getPath() + ")", ErrorCodes::PATH_ACCESS_DENIED);

View File

@ -1755,18 +1755,13 @@ const DiskSpace::StoragePolicyPtr & Context::getStoragePolicy(const String &name
{
auto lock = getLock();
if (!shared->merge_tree_storage_policy_selector)
{
constexpr auto config_name = "storage_configuration.policies";
auto & config = getConfigRef();
auto & policy_selector = getStoragePolicySelector();
shared->merge_tree_storage_policy_selector = std::make_unique<DiskSpace::StoragePolicySelector>(config, config_name, getDiskSelector());
}
return (*shared->merge_tree_storage_policy_selector)[name];
return policy_selector[name];
}
DiskSpace::StoragePolicySelector & Context::getStoragePolicySelector() const
DiskSpace::StoragePolicySelector & Context::getStoragePolicySelector() const
{
auto lock = getLock();

View File

@ -318,7 +318,7 @@ public:
/** Notify engine about updated dependencies for this storage. */
virtual void updateDependencies() {}
/// Returns data path if storage supports it, empty vector otherwise.
/// Returns data paths if storage supports it, empty vector otherwise.
virtual Strings getDataPaths() const { return {}; }
/// Returns ASTExpressionList of partition key expression for storage or nullptr if there is none.
@ -351,7 +351,7 @@ public:
/// Returns names of primary key + secondary sorting columns
virtual Names getSortingKeyColumns() const { return {}; }
/// Returns storage policy if table supports it
/// Returns storage policy if storage supports it
virtual DiskSpace::StoragePolicyPtr getStoragePolicy() const { return {}; }
private:

View File

@ -237,6 +237,8 @@ StoragePolicy::StoragePolicy(String name_, const Poco::Util::AbstractConfigurati
disk_names.insert(disk->getName());
}
}
move_factor = config.getDouble(config_prefix + ".move_factor", 0.1);
}
@ -348,7 +350,8 @@ StoragePolicySelector::StoragePolicySelector(const Poco::Util::AbstractConfigura
policies.emplace(default_storage_policy_name,
std::make_shared<StoragePolicy>(default_storage_policy_name,
Volumes{std::make_shared<Volume>(default_volume_name, std::vector<DiskPtr>{disks[default_disk_name]},
std::numeric_limits<UInt64>::max())}));
std::numeric_limits<UInt64>::max())},
0.0));
}

View File

@ -38,6 +38,9 @@ class Reservation;
using ReservationPtr = std::unique_ptr<Reservation>;
/** Space.
* Provide interface for reservation
*/
class Space : public std::enable_shared_from_this<Space>
{
public:
@ -109,7 +112,7 @@ public:
bool try_reserve(UInt64 bytes) const
{
auto avaliable_space = getAvailableSpace();
auto available_space = getAvailableSpace();
std::lock_guard lock(mutex);
if (bytes == 0)
{
@ -117,9 +120,9 @@ public:
++reservation_count;
return true;
}
avaliable_space -= std::min(avaliable_space, reserved_bytes);
LOG_DEBUG(&Logger::get("DiskSpaceMonitor"), "Unreserved " << avaliable_space << " , to reserve " << bytes << " on disk " << name);
if (avaliable_space >= bytes)
available_space -= std::min(available_space, reserved_bytes);
LOG_DEBUG(&Logger::get("DiskSpaceMonitor"), "Unreserved " << available_space << " , to reserve " << bytes << " on disk " << name);
if (available_space >= bytes)
{
++reservation_count;
reserved_bytes += bytes;
@ -160,10 +163,10 @@ public:
UInt64 getUnreservedSpace() const
{
auto avaliable_space = getSpaceInformation().getAvailableSpace();
auto available_space = getSpaceInformation().getAvailableSpace();
std::lock_guard lock(mutex);
avaliable_space -= std::min(avaliable_space, reserved_bytes);
return avaliable_space;
available_space -= std::min(available_space, reserved_bytes);
return available_space;
}
private:
@ -181,6 +184,11 @@ private:
using DiskPtr = std::shared_ptr<const Disk>;
using Disks = std::vector<DiskPtr>;
/** Reservationcontain
* Contain information about disk and size of reservation
* Unreserve on destroy
*/
class Reservation : private boost::noncopyable
{
public:
@ -266,6 +274,9 @@ private:
};
/** Volume.
* Contain set of "equivalent" disks
*/
class Volume : public Space
{
friend class StoragePolicy;
@ -319,6 +330,9 @@ using VolumePtr = std::shared_ptr<const Volume>;
using Volumes = std::vector<VolumePtr>;
/** Policy.
* Contain ordered set of Volumes
*/
class StoragePolicy : public Space
{
public:
@ -326,7 +340,7 @@ public:
StoragePolicy(String name_, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix,
const DiskSelector & disks);
StoragePolicy(String name_, Volumes volumes_) : volumes(std::move(volumes_)), name(std::move(name_))
StoragePolicy(String name_, Volumes volumes_, double move_factor_) : volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_)
{
if (volumes.empty())
throw Exception("StoragePolicy must contain at least one Volume", ErrorCodes::UNKNOWN_POLICY);
@ -376,6 +390,8 @@ public:
const auto & getVolumes() const { return volumes; }
auto getMoveFactor() const { return move_factor; }
VolumePtr getVolume(size_t i) const { return (i < volumes_names.size() ? volumes[i] : VolumePtr()); }
VolumePtr getVolumeByName(const String & volume_name) const
@ -390,6 +406,7 @@ private:
Volumes volumes;
String name;
std::map<String, size_t> volumes_names;
double move_factor;
};
using StoragePolicyPtr = std::shared_ptr<const StoragePolicy>;

View File

@ -165,7 +165,7 @@ MergeTreeData::MergeTreeData(
setTTLExpressions(columns_.getColumnTTLs(), ttl_table_ast_);
// format_file always contained on default disk
// format_file always contained on any data path
String version_file_path;
/// Creating directories, if not exist.
@ -1596,7 +1596,6 @@ void MergeTreeData::alterDataPart(
*/
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
///@TODO_IGR ASK Why dont we use part->relative_path?
MergedColumnOnlyOutputStream out(
*this,
in.getHeader(),
@ -2773,7 +2772,7 @@ MergeTreeData::getDetachedParts() const
DiskSpace::ReservationPtr MergeTreeData::reserveSpaceForPart(UInt64 expected_size)
{
constexpr UInt64 RESERVATION_MIN_ESTIMATION_SIZE = 1u * 1024u * 1024u; ///@TODO_IGR ASK Is it OK?
constexpr UInt64 RESERVATION_MIN_ESTIMATION_SIZE = 1u * 1024u * 1024u; /// 1MB
constexpr UInt64 RESERVATION_MULTIPLY_ESTIMATION_FACTOR = 1;
if (expected_size < RESERVATION_MIN_ESTIMATION_SIZE)

View File

@ -321,13 +321,25 @@ bool MergeTreeDataMergerMutator::selectPartsToMove(
std::unordered_map<DiskSpace::DiskPtr, MinSumMinElems<MergeTreeData::DataPartPtr>> need_to_move;
const auto & policy = data.getStoragePolicy();
for (const auto & disk : policy->getDisks())
const auto & volumes = policy->getVolumes();
/// Do not check if policy has one volume
if (volumes.size() == 1)
{
auto space_information = disk->getSpaceInformation();
///@TODO_IGR move constant to configuration
if (space_information.getTotalSpace() * 0.1 > space_information.getAvailableSpace())
return false;
}
/// Do not check last volume
for (size_t i = 0; i != volumes.size() - 1; ++i) {
for (const auto & disk : volumes[i]->disks)
{
need_to_move.emplace(disk, space_information.getTotalSpace() * 0.1 - space_information.getAvailableSpace());
auto space_information = disk->getSpaceInformation();
/// Do not take into account reserved space
if (space_information.getTotalSpace() * policy->getMoveFactor() > space_information.getAvailableSpace())
{
need_to_move.emplace(disk, space_information.getTotalSpace() * policy->getMoveFactor() - space_information.getAvailableSpace());
}
}
}
@ -342,14 +354,14 @@ bool MergeTreeDataMergerMutator::selectPartsToMove(
for (auto && move : need_to_move)
{
auto volume_priority = policy->getVolumePriorityByDisk(move.first);
auto min_volume_priority = policy->getVolumePriorityByDisk(move.first) + 1;
for (auto && part : move.second.getElems())
{
auto reservation = policy->reserve(part->bytes_on_disk, volume_priority);
auto reservation = policy->reserve(part->bytes_on_disk, min_volume_priority);
if (!reservation)
{
/// Next parts to move from this disk has greater size and same min volume priority
/// There are no space for thems
/// There are no space for them
/// But it can be possible to move data from other disks
break;
}

View File

@ -307,7 +307,6 @@ public:
: future_part(future_part_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
/// @TODO_IGR BUG Fix here. When mutation use old path!!!
reserved_space = storage.reserveSpaceForPart(total_size);
if (!reserved_space)
throw Exception("Not enough space for merging parts", ErrorCodes::NOT_ENOUGH_SPACE);
@ -649,76 +648,28 @@ bool StorageMergeTree::merge(
bool StorageMergeTree::move_parts()
{
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
MergeTreeMovingParts parts_to_move;
/// You must call destructor with unlocked `currently_merging_mutex`.
std::optional<CurrentlyMovingPartsTagger> moving_tagger;
{
std::lock_guard lock(currently_merging_mutex);
auto table_lock_holder = lockStructureForShare(true, RWLockImpl::NO_QUERY);
MergeTreeMovingParts parts_to_move;
auto can_move = [this] (const DataPartPtr & part, String *)
{
return !currently_merging.count(part);
};
std::lock_guard lock(currently_merging_mutex);
if (!merger_mutator.selectPartsToMove(parts_to_move, can_move))
return false;
auto can_move = [this](const DataPartPtr & part, String *)
{
return !currently_merging.count(part);
};
moving_tagger.emplace(std::move(parts_to_move), *this);
if (!merger_mutator.selectPartsToMove(parts_to_move, can_move))
return false;
moving_tagger.emplace(std::move(parts_to_move), *this);
}
}
// MergeList::EntryPtr merge_entry = global_context.getMergeList().insert(database_name, table_name, future_part);
// /// Logging
// Stopwatch stopwatch;
// MutableDataPartPtr new_part;
// auto write_part_log = [&] (const ExecutionStatus & execution_status)
// {
// try
// {
// auto part_log = global_context.getPartLog(database_name);
// if (!part_log)
// return;
//
// PartLogElement part_log_elem;
//
// part_log_elem.event_type = PartLogElement::MERGE_PARTS;
// part_log_elem.event_time = time(nullptr);
// part_log_elem.duration_ms = stopwatch.elapsed() / 1000000;
//
// part_log_elem.database_name = database_name;
// part_log_elem.table_name = table_name;
// part_log_elem.partition_id = future_part.part_info.partition_id;
// part_log_elem.part_name = future_part.name;
//
// if (new_part)
// part_log_elem.bytes_compressed_on_disk = new_part->bytes_on_disk;
//
// part_log_elem.source_part_names.reserve(future_part.parts.size());
// for (const auto & source_part : future_part.parts)
// part_log_elem.source_part_names.push_back(source_part->name);
//
// part_log_elem.rows_read = (*merge_entry)->rows_read;
// part_log_elem.bytes_read_uncompressed = (*merge_entry)->bytes_read_uncompressed;
//
// part_log_elem.rows = (*merge_entry)->rows_written;
// part_log_elem.bytes_uncompressed = (*merge_entry)->bytes_written_uncompressed;
//
// part_log_elem.error = static_cast<UInt16>(execution_status.code);
// part_log_elem.exception = execution_status.message;
//
// part_log->add(part_log_elem);
// }
// catch (...)
// {
// tryLogCurrentException(log, __PRETTY_FUNCTION__);
// }
// };
auto copied_parts = merger_mutator.cloneParts(moving_tagger->parts);
for (auto && copied_part : copied_parts)
@ -726,7 +677,7 @@ bool StorageMergeTree::move_parts()
auto part = getActiveContainingPart(copied_part->name);
if (!part || part->name != copied_part->name)
{
///@TODO_IGR LOG here that original part does not exists after move
/// Original part doies not exists after move. It is bug. It probably was merged
continue;
}

View File

@ -45,7 +45,7 @@ StorageSystemParts::StorageSystemParts(const std::string & name)
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"disk_name", std::make_shared<DataTypeString>()}, ///@TODO_IGR ASK is is OK?
{"disk_name", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
{"hash_of_all_files", std::make_shared<DataTypeString>()},

View File

@ -43,7 +43,7 @@ StorageSystemPartsColumns::StorageSystemPartsColumns(const std::string & name)
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"engine", std::make_shared<DataTypeString>()},
{"disk_name", std::make_shared<DataTypeString>()}, ///@TODO_IGR ASK is is OK?
{"disk_name", std::make_shared<DataTypeString>()},
{"path", std::make_shared<DataTypeString>()},
{"column", std::make_shared<DataTypeString>()},

View File

@ -18,9 +18,6 @@ node2 = cluster.add_instance('node2',
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/external:size=200M'],
macros={"shard": 0, "replica": 2} )
# node2 = cluster.add_instance('node2',
# main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'],
# with_zookeeper=True)
@pytest.fixture(scope="module")
def test_cluster():
@ -44,9 +41,6 @@ def test_cluster():
cluster.shutdown()
# def test_run_shell(test_cluster):
# test_cluster.open_bash_shell('node1')
# Check that configuration is valid
def test_config(test_cluster):
assert node1.query("select name, path, keep_free_space from system.disks") == "default\t/var/lib/clickhouse/data/\t1000\nexternal\t/external/\t0\njbod1\t/jbod1/\t10000000\njbod2\t/jbod2/\t10000000\n"
@ -98,7 +92,6 @@ def test_move(test_cluster):
assert node2.query("insert into node1_move_mt values (1)") == ""
assert node2.query("select disk_name from system.parts where table == 'node1_move_mt'") == "default\n"
test_cluster.open_bash_shell('node2')
# move from default to external
assert node2.query("alter table node1_move_mt move PART 'all_1_1_0' to disk 'external'") == ""
assert node2.query("select disk_name from system.parts where table == 'node1_move_mt'") == "external\n"
@ -125,119 +118,10 @@ def test_no_policy(test_cluster):
assert str(e).strip().split("\n")[1].find("Unknown StoragePolicy name_that_does_not_exists") != -1
#################################
# root@node1:/# clickhouse client -m
# ClickHouse client version 19.8.1.536.
# Connecting to localhost:9000 as user default.
# Connected to ClickHouse server version 19.8.1 revision 54420.
# node1 :) select * from system.disks;
# def test_same_credentials(same_credentials_cluster):
# node1.query("insert into test_table values ('2017-06-16', 111, 0)")
# time.sleep(1)
# assert node1.query("SELECT id FROM test_table order by id") == '111\n'
# assert node2.query("SELECT id FROM test_table order by id") == '111\n'
# node2.query("insert into test_table values ('2017-06-17', 222, 1)")
# time.sleep(1)
# assert node1.query("SELECT id FROM test_table order by id") == '111\n222\n'
# assert node2.query("SELECT id FROM test_table order by id") == '111\n222\n'
# node3 = cluster.add_instance('node3', main_configs=['configs/remote_servers.xml', 'configs/no_credentials.xml'], with_zookeeper=True)
# node4 = cluster.add_instance('node4', main_configs=['configs/remote_servers.xml', 'configs/no_credentials.xml'], with_zookeeper=True)
# @pytest.fixture(scope="module")
# def no_credentials_cluster():
# try:
# cluster.start()
# _fill_nodes([node3, node4], 2)
# yield cluster
# finally:
# cluster.shutdown()
# def test_no_credentials(no_credentials_cluster):
# node3.query("insert into test_table values ('2017-06-18', 111, 0)")
# time.sleep(1)
# assert node3.query("SELECT id FROM test_table order by id") == '111\n'
# assert node4.query("SELECT id FROM test_table order by id") == '111\n'
# node4.query("insert into test_table values ('2017-06-19', 222, 1)")
# time.sleep(1)
# assert node3.query("SELECT id FROM test_table order by id") == '111\n222\n'
# assert node4.query("SELECT id FROM test_table order by id") == '111\n222\n'
# node5 = cluster.add_instance('node5', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)
# node6 = cluster.add_instance('node6', main_configs=['configs/remote_servers.xml', 'configs/credentials2.xml'], with_zookeeper=True)
# @pytest.fixture(scope="module")
# def different_credentials_cluster():
# try:
# cluster.start()
# _fill_nodes([node5, node6], 3)
# yield cluster
# finally:
# cluster.shutdown()
# def test_different_credentials(different_credentials_cluster):
# node5.query("insert into test_table values ('2017-06-20', 111, 0)")
# time.sleep(1)
# assert node5.query("SELECT id FROM test_table order by id") == '111\n'
# assert node6.query("SELECT id FROM test_table order by id") == ''
# node6.query("insert into test_table values ('2017-06-21', 222, 1)")
# time.sleep(1)
# assert node5.query("SELECT id FROM test_table order by id") == '111\n'
# assert node6.query("SELECT id FROM test_table order by id") == '222\n'
# node7 = cluster.add_instance('node7', main_configs=['configs/remote_servers.xml', 'configs/credentials1.xml'], with_zookeeper=True)
# node8 = cluster.add_instance('node8', main_configs=['configs/remote_servers.xml', 'configs/no_credentials.xml'], with_zookeeper=True)
# @pytest.fixture(scope="module")
# def credentials_and_no_credentials_cluster():
# try:
# cluster.start()
# _fill_nodes([node7, node8], 4)
# yield cluster
# finally:
# cluster.shutdown()
# def test_credentials_and_no_credentials(credentials_and_no_credentials_cluster):
# node7.query("insert into test_table values ('2017-06-21', 111, 0)")
# time.sleep(1)
# assert node7.query("SELECT id FROM test_table order by id") == '111\n'
# assert node8.query("SELECT id FROM test_table order by id") == ''
# node8.query("insert into test_table values ('2017-06-22', 222, 1)")
# time.sleep(1)
# assert node7.query("SELECT id FROM test_table order by id") == '111\n'
# assert node8.query("SELECT id FROM test_table order by id") == '222\n'
'''
## Test stand for multiple disks feature
Currently for manual tests, can be easily scripted to be the part of intergration tests.
Currently for manual tests, can be easily scripted to be the part of integration tests.
To run you need to have docker & docker-compose.