Fixed integration test_tmp_policy

This commit is contained in:
Gleb Novikov 2020-07-08 17:25:23 +03:00 committed by Vladimir Chebotarev
parent 99d52552e1
commit aac97957ce
8 changed files with 32 additions and 22 deletions

View File

@ -32,6 +32,7 @@ public:
/// - Used with policy for temporary data
/// - Ignores all limitations
/// - Shares last access with reserve()
// TODO: Remove getNextDisk, move it's behaviour to VolumeJBOD::getDisk(), if it was called without argument.
DiskPtr getNextDisk();
/// Uses Round-robin to choose disk for reservation.

View File

@ -765,7 +765,17 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
&& worth_convert_to_two_level)
{
size_t size = current_memory_usage + params.min_free_disk_space;
const std::string tmp_path = params.tmp_volume->getDisk()->getPath();
std::string tmp_path;
VolumeJBODPtr vol;
if ((vol = dynamic_pointer_cast<VolumeJBOD>(params.tmp_volume)) != nullptr)
{
tmp_path = vol->getNextDisk()->getPath();
}
else
{
tmp_path = params.tmp_volume->getDisk()->getPath();
}
// enoughSpaceInDirectory() is not enough to make it right, since
// another process (or another thread of aggregator) can consume all

View File

@ -877,7 +877,7 @@ public:
/// Return empty result when aggregating without keys on empty set.
bool empty_result_for_aggregation_by_empty_set;
VolumeSingleDiskPtr tmp_volume;
VolumePtr tmp_volume;
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
@ -890,7 +890,7 @@ public:
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
VolumeSingleDiskPtr tmp_volume_, size_t max_threads_,
VolumePtr tmp_volume_, size_t max_threads_,
size_t min_free_disk_space_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),

View File

@ -319,7 +319,7 @@ struct ContextShared
ConfigurationPtr config; /// Global configuration settings.
String tmp_path; /// Path to the temporary files that occur when processing the request.
mutable VolumeSingleDiskPtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader;
@ -547,7 +547,7 @@ String Context::getDictionariesLibPath() const
return shared->dictionaries_lib_path;
}
VolumeSingleDiskPtr Context::getTemporaryVolume() const
VolumePtr Context::getTemporaryVolume() const
{
auto lock = getLock();
return shared->tmp_volume;
@ -572,7 +572,7 @@ void Context::setPath(const String & path)
shared->dictionaries_lib_path = shared->path + "dictionaries_lib/";
}
VolumeSingleDiskPtr Context::setTemporaryStorage(const String & path, const String & policy_name)
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name)
{
std::lock_guard lock(shared->storage_policies_mutex);
@ -590,8 +590,7 @@ VolumeSingleDiskPtr Context::setTemporaryStorage(const String & path, const Stri
StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name);
if (tmp_policy->getVolumes().size() != 1)
throw Exception("Policy " + policy_name + " is used temporary files, such policy should have exactly one volume", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
auto tmp_vol = tmp_policy->getVolume(0);
shared->tmp_volume = std::make_shared<SingleDiskVolume>(tmp_vol->getName() + "_tmp_volume", tmp_vol->getDisk());
shared->tmp_volume = tmp_policy->getVolume(0);
}
if (shared->tmp_volume->getDisks().empty())

View File

@ -108,8 +108,8 @@ using StoragePolicySelectorPtr = std::shared_ptr<const StoragePolicySelector>;
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
class SingleDiskVolume;
using VolumeSingleDiskPtr = std::shared_ptr<SingleDiskVolume>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
struct NamedSession;
@ -227,14 +227,14 @@ public:
String getUserFilesPath() const;
String getDictionariesLibPath() const;
VolumeSingleDiskPtr getTemporaryVolume() const;
VolumePtr getTemporaryVolume() const;
void setPath(const String & path);
void setFlagsPath(const String & path);
void setUserFilesPath(const String & path);
void setDictionariesLibPath(const String & path);
VolumeSingleDiskPtr setTemporaryStorage(const String & path, const String & policy_name = "");
VolumePtr setTemporaryStorage(const String & path, const String & policy_name = "");
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;

View File

@ -16,8 +16,8 @@ class TableJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;
class SingleDiskVolume;
using VolumeSingleDiskPtr = std::shared_ptr<SingleDiskVolume>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
struct SortedBlocksWriter
{
@ -63,7 +63,7 @@ struct SortedBlocksWriter
std::mutex insert_mutex;
std::condition_variable flush_condvar;
const SizeLimits & size_limits;
VolumeSingleDiskPtr volume;
VolumePtr volume;
Block sample_block;
const SortDescription & sort_description;
Blocks inserted_blocks;
@ -76,7 +76,7 @@ struct SortedBlocksWriter
size_t flush_number = 0;
size_t flush_inflight = 0;
SortedBlocksWriter(const SizeLimits & size_limits_, VolumeSingleDiskPtr volume_, const Block & sample_block_,
SortedBlocksWriter(const SizeLimits & size_limits_, VolumePtr volume_, const Block & sample_block_,
const SortDescription & description, size_t rows_in_block_, size_t num_files_to_merge_, const String & codec_)
: size_limits(size_limits_)
, volume(volume_)

View File

@ -14,7 +14,7 @@
namespace DB
{
TableJoin::TableJoin(const Settings & settings, VolumeSingleDiskPtr tmp_volume_)
TableJoin::TableJoin(const Settings & settings, VolumePtr tmp_volume_)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
, default_max_bytes(settings.default_max_bytes_in_join)
, join_use_nulls(settings.join_use_nulls)

View File

@ -24,8 +24,8 @@ class DictionaryReader;
struct Settings;
class SingleDiskVolume;
using VolumeSingleDiskPtr = std::shared_ptr<SingleDiskVolume>;
class IVolume;
using VolumePtr = std::shared_ptr<IVolume>;
class TableJoin
{
@ -71,11 +71,11 @@ class TableJoin
/// Original name -> name. Only ranamed columns.
std::unordered_map<String, String> renames;
VolumeSingleDiskPtr tmp_volume;
VolumePtr tmp_volume;
public:
TableJoin() = default;
TableJoin(const Settings &, VolumeSingleDiskPtr tmp_volume);
TableJoin(const Settings &, VolumePtr tmp_volume);
/// for StorageJoin
TableJoin(SizeLimits limits, bool use_nulls, ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness,
@ -97,7 +97,7 @@ public:
ASTTableJoin::Strictness strictness() const { return table_join.strictness; }
bool sameStrictnessAndKind(ASTTableJoin::Strictness, ASTTableJoin::Kind) const;
const SizeLimits & sizeLimits() const { return size_limits; }
VolumeSingleDiskPtr getTemporaryVolume() { return tmp_volume; }
VolumePtr getTemporaryVolume() { return tmp_volume; }
bool allowMergeJoin() const;
bool allowDictJoin(const String & dict_key, const Block & sample_block, Names &, NamesAndTypesList &) const;
bool preferMergeJoin() const { return join_algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE; }