Merge branch 'master' of https://github.com/weiqxu/ClickHouse into weiqxu-master

This commit is contained in:
Alexey Milovidov 2019-08-27 21:52:09 +03:00
commit 5ca8f8d695
11 changed files with 55 additions and 18 deletions

View File

@ -352,6 +352,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13") \
M(SettingBool, compile, false, "Whether query compilation is enabled. Will be removed after 2020-03-13") \
M(SettingUInt64, min_free_disk_space, 0, "The minimum disk space to keep") \
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)

View File

@ -21,10 +21,11 @@ namespace DB
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
size_t max_bytes_before_external_sort_, const std::string & tmp_path_, size_t min_free_disk_space_)
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_remerge(max_bytes_before_remerge_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_),
min_free_disk_space(min_free_disk_space_)
{
children.push_back(input);
header = children.at(0)->getHeader();
@ -77,6 +78,11 @@ Block MergeSortingBlockInputStream::readImpl()
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
auto freeSpace = Poco::File(tmp_path).freeSpace();
if (min_free_disk_space > freeSpace - sum_bytes_in_blocks)
{
throw Exception("Not enough space.", ErrorCodes::NOT_ENOUGH_SPACE);
}
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
const std::string & path = temporary_files.back()->path();

View File

@ -18,6 +18,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
/** Merges stream of sorted each-separately blocks to sorted as-a-whole stream of blocks.
* If data to sort is too much, could use external sorting, with temporary files.
*/
@ -73,7 +77,8 @@ public:
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSorting"; }
@ -93,6 +98,7 @@ private:
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
const std::string tmp_path;
size_t min_free_disk_space;
Logger * log = &Logger::get("MergeSortingBlockInputStream");

View File

@ -639,6 +639,11 @@ bool Aggregator::executeOnBlock(const Block & block, AggregatedDataVariants & re
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
auto freeSpace = Poco::File(params.tmp_path).freeSpace();
if (params.min_free_disk_space > freeSpace - current_memory_usage)
{
throw Exception("Not enough space.", ErrorCodes::NOT_ENOUGH_SPACE);
}
writeToTemporaryFile(result);
}

View File

@ -39,6 +39,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
extern const int NOT_ENOUGH_SPACE;
}
class IBlockOutputStream;
@ -796,6 +797,7 @@ public:
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
const size_t min_free_disk_space;
Params(
const Block & src_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_,
@ -803,21 +805,23 @@ public:
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
const std::string & tmp_path_, size_t max_threads_)
const std::string & tmp_path_, size_t max_threads_,
size_t min_free_disk_space_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
overflow_row(overflow_row_), max_rows_to_group_by(max_rows_to_group_by_), group_by_overflow_mode(group_by_overflow_mode_),
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
tmp_path(tmp_path_), max_threads(max_threads_)
tmp_path(tmp_path_), max_threads(max_threads_),
min_free_disk_space(min_free_disk_space_)
{
}
/// Only parameters that matter during merge.
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_, 0)
{
intermediate_header = intermediate_header_;
}

View File

@ -1657,7 +1657,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space);
/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
@ -1723,7 +1723,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
@ -1943,7 +1943,7 @@ void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificato
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space);
if (modificator == Modificator::ROLLUP)
pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);
@ -1972,7 +1972,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context.getTemporaryPath(), settings.max_threads);
context.getTemporaryPath(), settings.max_threads, settings.min_free_disk_space);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
@ -2073,7 +2073,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, SortingInfoPtr so
pipeline.firstStream() = std::make_shared<MergeSortingBlockInputStream>(
pipeline.firstStream(), order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort, context.getTemporaryPath());
settings.max_bytes_before_external_sort, context.getTemporaryPath(), settings.min_free_disk_space);
}
}
@ -2111,7 +2111,7 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, SortingInfoP
return std::make_shared<MergeSortingTransform>(
header, order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort, context.getTemporaryPath());
settings.max_bytes_before_external_sort, context.getTemporaryPath(), settings.min_free_disk_space);
});
}

View File

@ -79,7 +79,7 @@ int main(int argc, char ** argv)
Aggregator::Params params(
stream->getHeader(), {0, 1}, aggregate_descriptions,
false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1);
false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1, 0);
Aggregator aggregator(params);

View File

@ -236,11 +236,13 @@ MergeSortingTransform::MergeSortingTransform(
SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_)
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t min_free_disk_space_)
: IProcessor({header}, {header})
, description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
, min_free_disk_space(min_free_disk_space_)
{
auto & sample = inputs.front().getHeader();
@ -504,6 +506,11 @@ void MergeSortingTransform::consume(Chunk chunk)
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
auto freeSpace = Poco::File(tmp_path).freeSpace();
if (min_free_disk_space > freeSpace - sum_bytes_in_blocks)
{
throw Exception("Not enough space.", ErrorCodes::NOT_ENOUGH_SPACE);
}
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
const std::string & path = temporary_files.back()->path();

View File

@ -14,6 +14,10 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
class MergeSorter;
class MergeSortingTransform : public IProcessor
@ -24,7 +28,8 @@ public:
SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_);
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t min_free_disk_space_);
~MergeSortingTransform() override;
@ -44,6 +49,7 @@ private:
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
const std::string tmp_path;
size_t min_free_disk_space;
Logger * log = &Logger::get("MergeSortingBlockInputStream");

View File

@ -229,7 +229,8 @@ try
max_bytes_before_external_group_by,
false, /// empty_result_for_aggregation_by_empty_set
cur_path, /// tmp_path
1 /// max_threads
1, /// max_threads
0
);
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);
@ -301,7 +302,8 @@ try
max_bytes_before_external_group_by,
false, /// empty_result_for_aggregation_by_empty_set
cur_path, /// tmp_path
1 /// max_threads
1, /// max_threads
0
);
auto agg_params = std::make_shared<AggregatingTransformParams>(params, /* final =*/ false);

View File

@ -133,7 +133,7 @@ try
SortDescription description = {{0, 1, 1}};
auto transform = std::make_shared<MergeSortingTransform>(
source->getPort().getHeader(), description,
max_merged_block_size, limit, max_bytes_before_remerge, max_bytes_before_external_sort, ".");
max_merged_block_size, limit, max_bytes_before_remerge, max_bytes_before_external_sort, ".", 0);
auto sink = std::make_shared<CheckSortedSink>();
connect(source->getPort(), transform->getInputs().front());