Merge branch 'master' of github.com:ClickHouse/ClickHouse

This commit is contained in:
alesapin 2019-12-12 11:50:48 +03:00
commit 7e205c55c0
39 changed files with 1245 additions and 134 deletions

View File

@ -128,6 +128,8 @@ Yu](https://github.com/yuzhichang))
* Introduce CustomSeparated data format that supports custom escaping and
delimiter rules. [#7118](https://github.com/ClickHouse/ClickHouse/pull/7118)
([tavplubix](https://github.com/tavplubix))
* Support Redis as source of external dictionary. [#4361](https://github.com/ClickHouse/ClickHouse/pull/4361) [#6962](https://github.com/ClickHouse/ClickHouse/pull/6962) ([comunodi](https://github.com/comunodi), [Anton
Popov](https://github.com/CurtizJ))
### Bug Fix
* Fix wrong query result if it has `WHERE IN (SELECT ...)` section and `optimize_read_in_order` is

View File

@ -14,6 +14,15 @@ String quoteString(const StringRef & x)
}
String doubleQuoteString(const StringRef & x)
{
String res(x.size, '\0');
WriteBufferFromString wb(res);
writeDoubleQuotedString(x, wb);
return res;
}
String backQuote(const StringRef & x)
{
String res(x.size, '\0');

View File

@ -9,6 +9,9 @@ namespace DB
/// Quote the string.
String quoteString(const StringRef & x);
/// Double quote the string.
String doubleQuoteString(const StringRef & x);
/// Quote the identifier with backquotes.
String backQuote(const StringRef & x);

View File

@ -85,6 +85,8 @@ Block TTLBlockInputStream::readImpl()
removeValuesWithExpiredColumnTTL(block);
updateMovesTTL(block);
return block;
}
@ -145,7 +147,8 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
defaults_expression->execute(block_with_defaults);
}
for (const auto & [name, ttl_entry] : storage.ttl_entries_by_name)
std::vector<String> columns_to_remove;
for (const auto & [name, ttl_entry] : storage.column_ttl_entries_by_name)
{
const auto & old_ttl_info = old_ttl_infos.columns_ttl[name];
auto & new_ttl_info = new_ttl_infos.columns_ttl[name];
@ -159,7 +162,10 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
continue;
if (!block.has(ttl_entry.result_column))
{
columns_to_remove.push_back(ttl_entry.result_column);
ttl_entry.expression->execute(block);
}
ColumnPtr default_column = nullptr;
if (block_with_defaults.has(name))
@ -192,9 +198,34 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
column_with_type.column = std::move(result_column);
}
for (const auto & elem : storage.ttl_entries_by_name)
if (block.has(elem.second.result_column))
block.erase(elem.second.result_column);
for (const String & column : columns_to_remove)
block.erase(column);
}
void TTLBlockInputStream::updateMovesTTL(Block & block)
{
std::vector<String> columns_to_remove;
for (const auto & ttl_entry : storage.move_ttl_entries)
{
auto & new_ttl_info = new_ttl_infos.moves_ttl[ttl_entry.result_column];
if (!block.has(ttl_entry.result_column))
{
columns_to_remove.push_back(ttl_entry.result_column);
ttl_entry.expression->execute(block);
}
const IColumn * ttl_column = block.getByName(ttl_entry.result_column).column.get();
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
new_ttl_info.update(cur_ttl);
}
}
for (const String & column : columns_to_remove)
block.erase(column);
}
UInt32 TTLBlockInputStream::getTimestampByIndex(const IColumn * column, size_t ind)

View File

@ -58,6 +58,9 @@ private:
/// Removes rows with expired table ttl and computes new ttl_infos for part
void removeRowsWithExpiredTableTTL(Block & block);
/// Updates TTL for moves
void updateMovesTTL(Block & block);
UInt32 getTimestampByIndex(const IColumn * column, size_t ind);
bool isTTLExpired(time_t ttl);
};

View File

@ -1508,7 +1508,18 @@ BackgroundProcessingPool & Context::getBackgroundMovePool()
{
auto lock = getLock();
if (!shared->background_move_pool)
shared->background_move_pool.emplace(settings.background_move_pool_size, "BackgroundMovePool", "BgMoveProcPool");
{
BackgroundProcessingPool::PoolSettings pool_settings;
auto & config = getConfigRef();
pool_settings.thread_sleep_seconds = config.getDouble("background_move_processing_pool_thread_sleep_seconds", 10);
pool_settings.thread_sleep_seconds_random_part = config.getDouble("background_move_processing_pool_thread_sleep_seconds_random_part", 1.0);
pool_settings.thread_sleep_seconds_if_nothing_to_do = config.getDouble("background_move_processing_pool_thread_sleep_seconds_if_nothing_to_do", 0.1);
pool_settings.task_sleep_seconds_when_no_work_min = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_min", 10);
pool_settings.task_sleep_seconds_when_no_work_max = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_max", 600);
pool_settings.task_sleep_seconds_when_no_work_multiplier = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_multiplier", 1.1);
pool_settings.task_sleep_seconds_when_no_work_random_part = config.getDouble("background_move_processing_pool_task_sleep_seconds_when_no_work_random_part", 1.0);
shared->background_move_pool.emplace(settings.background_move_pool_size, pool_settings, "BackgroundMovePool", "BgMoveProcPool");
}
return *shared->background_move_pool;
}

View File

@ -176,12 +176,14 @@ void ASTAlterCommand::formatImpl(
settings.ostr << " TO ";
switch (move_destination_type)
{
case MoveDestinationType::DISK:
case PartDestinationType::DISK:
settings.ostr << "DISK ";
break;
case MoveDestinationType::VOLUME:
case PartDestinationType::VOLUME:
settings.ostr << "VOLUME ";
break;
default:
break;
}
settings.ostr << quoteString(move_destination_name);
}

View File

@ -3,6 +3,7 @@
#include <Parsers/IAST.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTQueryWithOnCluster.h>
#include <Parsers/ASTTTLElement.h>
namespace DB
@ -128,15 +129,9 @@ public:
bool if_exists = false; /// option for DROP_COLUMN, MODIFY_COLUMN, COMMENT_COLUMN
enum MoveDestinationType
{
DISK,
VOLUME,
};
PartDestinationType move_destination_type; /// option for MOVE PART/PARTITION
MoveDestinationType move_destination_type;
String move_destination_name;
String move_destination_name; /// option for MOVE PART/PARTITION
/** For FETCH PARTITION - the path in ZK to the shard, from which to download the partition.
*/

View File

@ -0,0 +1,27 @@
#include <Columns/Collator.h>
#include <Common/quoteString.h>
#include <Parsers/ASTTTLElement.h>
namespace DB
{
void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
children.front()->formatImpl(settings, state, frame);
if (destination_type == PartDestinationType::DISK)
{
settings.ostr << " TO DISK " << quoteString(destination_name);
}
else if (destination_type == PartDestinationType::VOLUME)
{
settings.ostr << " TO VOLUME " << quoteString(destination_name);
}
else if (destination_type == PartDestinationType::DELETE)
{
/// It would be better to output "DELETE" here but that will break compatibility with earlier versions.
}
}
}

View File

@ -0,0 +1,36 @@
#pragma once
#include <Parsers/IAST.h>
#include <Storages/MergeTree/PartDestinationType.h>
namespace DB
{
/** Element of TTL expression.
*/
class ASTTTLElement : public IAST
{
public:
PartDestinationType destination_type;
String destination_name;
ASTTTLElement(PartDestinationType destination_type_, const String & destination_name_)
: destination_type(destination_type_)
, destination_name(destination_name_)
{
}
String getID(char) const override { return "TTLElement"; }
ASTPtr clone() const override
{
auto clone = std::make_shared<ASTTTLElement>(*this);
clone->cloneChildren();
return clone;
}
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -16,6 +16,7 @@
#include <Parsers/ASTAsterisk.h>
#include <Parsers/ASTQualifiedAsterisk.h>
#include <Parsers/ASTQueryParameter.h>
#include <Parsers/ASTTTLElement.h>
#include <Parsers/ASTOrderByElement.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTFunctionWithKeyValueArguments.h>
@ -1414,6 +1415,42 @@ bool ParserFunctionWithKeyValueArguments::parseImpl(Pos & pos, ASTPtr & node, Ex
return true;
}
bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_to_disk("TO DISK");
ParserKeyword s_to_volume("TO VOLUME");
ParserKeyword s_delete("DELETE");
ParserStringLiteral parser_string_literal;
ParserExpression parser_exp;
ASTPtr expr_elem;
if (!parser_exp.parse(pos, expr_elem, expected))
return false;
PartDestinationType destination_type = PartDestinationType::DELETE;
String destination_name;
if (s_to_disk.ignore(pos))
destination_type = PartDestinationType::DISK;
else if (s_to_volume.ignore(pos))
destination_type = PartDestinationType::VOLUME;
else
s_delete.ignore(pos);
if (destination_type == PartDestinationType::DISK || destination_type == PartDestinationType::VOLUME)
{
ASTPtr ast_space_name;
if (!parser_string_literal.parse(pos, ast_space_name, expected))
return false;
destination_name = ast_space_name->as<ASTLiteral &>().value.get<const String &>();
}
node = std::make_shared<ASTTTLElement>(destination_type, destination_name);
node->children.push_back(expr_elem);
return true;
}
bool ParserIdentifierWithOptionalParameters::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserIdentifier non_parametric;

View File

@ -320,4 +320,14 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
/** Element of TTL expression - same as expression element, but in addition,
* TO DISK 'xxx' | TO VOLUME 'xxx' | DELETE could be specified
*/
class ParserTTLElement : public IParserBase
{
protected:
const char * getName() const { return "element of TTL expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
}

View File

@ -557,6 +557,13 @@ bool ParserOrderByExpressionList::parseImpl(Pos & pos, ASTPtr & node, Expected &
}
bool ParserTTLExpressionList::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
return ParserList(std::make_unique<ParserTTLElement>(), std::make_unique<ParserToken>(TokenType::Comma), false)
.parse(pos, node, expected);
}
bool ParserNullityChecking::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ASTPtr node_comp;

View File

@ -386,6 +386,7 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/// Parser for list of key-value pairs.
class ParserKeyValuePairsList : public IParserBase
{
@ -394,4 +395,12 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
class ParserTTLExpressionList : public IParserBase
{
protected:
const char * getName() const { return "ttl expression"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected);
};
}

View File

@ -87,6 +87,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
/* allow_empty = */ false);
ParserSetQuery parser_settings(true);
ParserNameList values_p;
ParserTTLExpressionList parser_ttl_list;
if (is_live_view)
{
@ -236,9 +237,9 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->part = true;
if (s_to_disk.ignore(pos))
command->move_destination_type = ASTAlterCommand::MoveDestinationType::DISK;
command->move_destination_type = PartDestinationType::DISK;
else if (s_to_volume.ignore(pos))
command->move_destination_type = ASTAlterCommand::MoveDestinationType::VOLUME;
command->move_destination_type = PartDestinationType::VOLUME;
else
return false;
@ -256,9 +257,9 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->type = ASTAlterCommand::MOVE_PARTITION;
if (s_to_disk.ignore(pos))
command->move_destination_type = ASTAlterCommand::MoveDestinationType::DISK;
command->move_destination_type = PartDestinationType::DISK;
else if (s_to_volume.ignore(pos))
command->move_destination_type = ASTAlterCommand::MoveDestinationType::VOLUME;
command->move_destination_type = PartDestinationType::VOLUME;
else
return false;
@ -431,7 +432,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
}
else if (s_modify_ttl.ignore(pos, expected))
{
if (!parser_exp_elem.parse(pos, command->ttl, expected))
if (!parser_ttl_list.parse(pos, command->ttl, expected))
return false;
command->type = ASTAlterCommand::MODIFY_TTL;
}

View File

@ -250,6 +250,7 @@ bool ParserStorage::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserIdentifierWithOptionalParameters ident_with_optional_params_p;
ParserExpression expression_p;
ParserSetQuery settings_p(/* parse_only_internals_ = */ true);
ParserTTLExpressionList parser_ttl_list;
ASTPtr engine;
ASTPtr partition_by;
@ -303,7 +304,7 @@ bool ParserStorage::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!ttl_table && s_ttl.ignore(pos, expected))
{
if (expression_p.parse(pos, ttl_table, expected))
if (parser_ttl_list.parse(pos, ttl_table, expected))
continue;
else
return false;

View File

@ -23,17 +23,6 @@ namespace CurrentMetrics
namespace DB
{
static constexpr double thread_sleep_seconds = 10;
static constexpr double thread_sleep_seconds_random_part = 1.0;
static constexpr double thread_sleep_seconds_if_nothing_to_do = 0.1;
/// For exponential backoff.
static constexpr double task_sleep_seconds_when_no_work_min = 10;
static constexpr double task_sleep_seconds_when_no_work_max = 600;
static constexpr double task_sleep_seconds_when_no_work_multiplier = 1.1;
static constexpr double task_sleep_seconds_when_no_work_random_part = 1.0;
void BackgroundProcessingPoolTaskInfo::wake()
{
Poco::Timestamp current_time;
@ -61,9 +50,13 @@ void BackgroundProcessingPoolTaskInfo::wake()
}
BackgroundProcessingPool::BackgroundProcessingPool(int size_, const char * log_name, const char * thread_name_)
BackgroundProcessingPool::BackgroundProcessingPool(int size_,
const PoolSettings & pool_settings,
const char * log_name,
const char * thread_name_)
: size(size_)
, thread_name(thread_name_)
, settings(pool_settings)
{
logger = &Logger::get(log_name);
LOG_INFO(logger, "Create " << log_name << " with " << size << " threads");
@ -147,7 +140,7 @@ void BackgroundProcessingPool::threadFunction()
memory_tracker->setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
pcg64 rng(randomSeed());
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, thread_sleep_seconds_random_part)(rng)));
std::this_thread::sleep_for(std::chrono::duration<double>(std::uniform_real_distribution<double>(0, settings.thread_sleep_seconds_random_part)(rng)));
while (!shutdown)
{
@ -182,8 +175,8 @@ void BackgroundProcessingPool::threadFunction()
{
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock,
std::chrono::duration<double>(thread_sleep_seconds
+ std::uniform_real_distribution<double>(0, thread_sleep_seconds_random_part)(rng)));
std::chrono::duration<double>(settings.thread_sleep_seconds
+ std::uniform_real_distribution<double>(0, settings.thread_sleep_seconds_random_part)(rng)));
continue;
}
@ -193,7 +186,7 @@ void BackgroundProcessingPool::threadFunction()
{
std::unique_lock lock(tasks_mutex);
wake_event.wait_for(lock, std::chrono::microseconds(
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, thread_sleep_seconds_random_part * 1000000)(rng)));
min_time - current_time + std::uniform_int_distribution<uint64_t>(0, settings.thread_sleep_seconds_random_part * 1000000)(rng)));
}
std::shared_lock rlock(task->rwlock);
@ -231,11 +224,11 @@ void BackgroundProcessingPool::threadFunction()
Poco::Timestamp next_time_to_execute; /// current time
if (task_result == TaskResult::ERROR)
next_time_to_execute += 1000000 * (std::min(
task_sleep_seconds_when_no_work_max,
task_sleep_seconds_when_no_work_min * std::pow(task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done))
+ std::uniform_real_distribution<double>(0, task_sleep_seconds_when_no_work_random_part)(rng));
settings.task_sleep_seconds_when_no_work_max,
settings.task_sleep_seconds_when_no_work_min * std::pow(settings.task_sleep_seconds_when_no_work_multiplier, task->count_no_work_done))
+ std::uniform_real_distribution<double>(0, settings.task_sleep_seconds_when_no_work_random_part)(rng));
else if (task_result == TaskResult::NOTHING_TO_DO)
next_time_to_execute += 1000000 * thread_sleep_seconds_if_nothing_to_do;
next_time_to_execute += 1000000 * settings.thread_sleep_seconds_if_nothing_to_do;
tasks.erase(task->iterator);
task->iterator = tasks.emplace(next_time_to_execute, task);

View File

@ -14,6 +14,7 @@
#include <Core/Types.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
@ -46,7 +47,23 @@ public:
using TaskHandle = std::shared_ptr<TaskInfo>;
struct PoolSettings
{
double thread_sleep_seconds = 10;
double thread_sleep_seconds_random_part = 1.0;
double thread_sleep_seconds_if_nothing_to_do = 0.1;
/// For exponential backoff.
double task_sleep_seconds_when_no_work_min = 10;
double task_sleep_seconds_when_no_work_max = 600;
double task_sleep_seconds_when_no_work_multiplier = 1.1;
double task_sleep_seconds_when_no_work_random_part = 1.0;
PoolSettings() noexcept {}
};
BackgroundProcessingPool(int size_,
const PoolSettings & pool_settings = {},
const char * log_name = "BackgroundProcessingPool",
const char * thread_name_ = "BackgrProcPool");
@ -84,6 +101,9 @@ protected:
ThreadGroupStatusPtr thread_group;
void threadFunction();
private:
PoolSettings settings;
};

View File

@ -38,6 +38,7 @@
#include <Common/Increment.h>
#include <Common/SimpleIncrement.h>
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/Stopwatch.h>
#include <Common/typeid_cast.h>
@ -70,6 +71,12 @@ namespace CurrentMetrics
}
namespace
{
constexpr UInt64 RESERVATION_MIN_ESTIMATION_SIZE = 1u * 1024u * 1024u; /// 1MB
}
namespace DB
{
@ -124,7 +131,6 @@ MergeTreeData::MergeTreeData(
, merging_params(merging_params_)
, partition_by_ast(partition_by_ast_)
, sample_by_ast(sample_by_ast_)
, ttl_table_ast(ttl_table_ast_)
, require_part_metadata(require_part_metadata_)
, database_name(database_)
, table_name(table_)
@ -566,15 +572,17 @@ void checkTTLExpression(const ExpressionActionsPtr & ttl_expression, const Strin
void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new_column_ttls,
const ASTPtr & new_ttl_table_ast, bool only_check)
{
auto create_ttl_entry = [this](ASTPtr ttl_ast) -> TTLEntry
auto create_ttl_entry = [this](ASTPtr ttl_ast)
{
TTLEntry result;
auto syntax_result = SyntaxAnalyzer(global_context).analyze(ttl_ast, getColumns().getAllPhysical());
auto expr = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false);
result.expression = ExpressionAnalyzer(ttl_ast, syntax_result, global_context).getActions(false);
result.destination_type = PartDestinationType::DELETE;
result.result_column = ttl_ast->getColumnName();
String result_column = ttl_ast->getColumnName();
checkTTLExpression(expr, result_column);
return {expr, result_column};
checkTTLExpression(result.expression, result.result_column);
return result;
};
if (!new_column_ttls.empty())
@ -592,23 +600,49 @@ void MergeTreeData::setTTLExpressions(const ColumnsDescription::ColumnTTLs & new
for (const auto & [name, ast] : new_column_ttls)
{
if (columns_ttl_forbidden.count(name))
throw Exception("Trying to set ttl for key column " + name, ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Trying to set TTL for key column " + name, ErrorCodes::ILLEGAL_COLUMN);
else
{
auto new_ttl_entry = create_ttl_entry(ast);
if (!only_check)
ttl_entries_by_name.emplace(name, new_ttl_entry);
column_ttl_entries_by_name.emplace(name, new_ttl_entry);
}
}
}
if (new_ttl_table_ast)
{
auto new_ttl_table_entry = create_ttl_entry(new_ttl_table_ast);
if (!only_check)
bool seen_delete_ttl = false;
for (auto ttl_element_ptr : new_ttl_table_ast->children)
{
ttl_table_ast = new_ttl_table_ast;
ttl_table_entry = new_ttl_table_entry;
ASTTTLElement & ttl_element = static_cast<ASTTTLElement &>(*ttl_element_ptr);
if (ttl_element.destination_type == PartDestinationType::DELETE)
{
if (seen_delete_ttl)
{
throw Exception("More than one DELETE TTL expression is not allowed", ErrorCodes::BAD_TTL_EXPRESSION);
}
auto new_ttl_table_entry = create_ttl_entry(ttl_element.children[0]);
if (!only_check)
{
ttl_table_ast = ttl_element.children[0];
ttl_table_entry = new_ttl_table_entry;
}
seen_delete_ttl = true;
}
else
{
auto new_ttl_entry = create_ttl_entry(ttl_element.children[0]);
if (!only_check)
{
new_ttl_entry.entry_ast = ttl_element_ptr;
new_ttl_entry.destination_type = ttl_element.destination_type;
new_ttl_entry.destination_name = ttl_element.destination_name;
move_ttl_entries.emplace_back(std::move(new_ttl_entry));
}
}
}
}
}
@ -3096,20 +3130,138 @@ MergeTreeData::MutableDataPartsVector MergeTreeData::tryLoadPartsToAttach(const
return loaded_parts;
}
DiskSpace::ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size)
namespace
{
constexpr UInt64 RESERVATION_MIN_ESTIMATION_SIZE = 1u * 1024u * 1024u; /// 1MB
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto reservation = storage_policy->reserve(expected_size);
inline DiskSpace::ReservationPtr checkAndReturnReservation(UInt64 expected_size, DiskSpace::ReservationPtr reservation)
{
if (reservation)
return reservation;
throw Exception("Cannot reserve " + formatReadableSizeWithBinarySuffix(expected_size) + ", not enough space.",
throw Exception("Cannot reserve " + formatReadableSizeWithBinarySuffix(expected_size) + ", not enough space",
ErrorCodes::NOT_ENOUGH_SPACE);
}
}
DiskSpace::ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto reservation = storage_policy->reserve(expected_size);
return checkAndReturnReservation(expected_size, std::move(reservation));
}
DiskSpace::ReservationPtr MergeTreeData::reserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto reservation = tryReserveSpace(expected_size, space);
return checkAndReturnReservation(expected_size, std::move(reservation));
}
DiskSpace::ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
return space->reserve(expected_size);
}
DiskSpace::ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
DiskSpace::ReservationPtr reservation = tryReserveSpacePreferringTTLRules(expected_size, ttl_infos, time_of_move);
return checkAndReturnReservation(expected_size, std::move(reservation));
}
DiskSpace::ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
DiskSpace::ReservationPtr reservation;
auto ttl_entry = selectTTLEntryForTTLInfos(ttl_infos, time_of_move);
if (ttl_entry != nullptr)
{
DiskSpace::SpacePtr destination_ptr = ttl_entry->getDestination(storage_policy);
if (!destination_ptr)
{
if (ttl_entry->destination_type == PartDestinationType::VOLUME)
LOG_WARNING(log, "Would like to reserve space on volume '"
<< ttl_entry->destination_name << "' by TTL rule of table '"
<< log_name << "' but volume was not found");
else if (ttl_entry->destination_type == PartDestinationType::DISK)
LOG_WARNING(log, "Would like to reserve space on disk '"
<< ttl_entry->destination_name << "' by TTL rule of table '"
<< log_name << "' but disk was not found");
}
else
{
reservation = destination_ptr->reserve(expected_size);
if (reservation)
return reservation;
}
}
reservation = storage_policy->reserve(expected_size);
return reservation;
}
DiskSpace::SpacePtr MergeTreeData::TTLEntry::getDestination(const DiskSpace::StoragePolicyPtr & policy) const
{
if (destination_type == PartDestinationType::VOLUME)
return policy->getVolumeByName(destination_name);
else if (destination_type == PartDestinationType::DISK)
return policy->getDiskByName(destination_name);
else
return {};
}
bool MergeTreeData::TTLEntry::isPartInDestination(const DiskSpace::StoragePolicyPtr & policy, const MergeTreeDataPart & part) const
{
if (destination_type == PartDestinationType::VOLUME)
{
for (const auto & disk : policy->getVolumeByName(destination_name)->disks)
if (disk->getName() == part.disk->getName())
return true;
}
else if (destination_type == PartDestinationType::DISK)
return policy->getDiskByName(destination_name)->getName() == part.disk->getName();
return false;
}
const MergeTreeData::TTLEntry * MergeTreeData::selectTTLEntryForTTLInfos(
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move) const
{
const MergeTreeData::TTLEntry * result = nullptr;
/// Prefer TTL rule which went into action last.
time_t max_max_ttl = 0;
for (const auto & ttl_entry : move_ttl_entries)
{
auto ttl_info_it = ttl_infos.moves_ttl.find(ttl_entry.result_column);
if (ttl_info_it != ttl_infos.moves_ttl.end()
&& ttl_info_it->second.max <= time_of_move
&& max_max_ttl <= ttl_info_it->second.max)
{
result = &ttl_entry;
max_max_ttl = ttl_info_it->second.max;
}
}
return result;
}
MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affordable_states) const
{
DataParts res;
@ -3289,12 +3441,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
String dst_part_name = src_part->getNewName(dst_part_info);
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto reservation = src_part->disk->reserve(src_part->bytes_on_disk);
if (!reservation)
{
throw Exception("Cannot reserve " + formatReadableSizeWithBinarySuffix(src_part->bytes_on_disk) + ", not enough space",
ErrorCodes::NOT_ENOUGH_SPACE);
}
auto reservation = reserveSpace(src_part->bytes_on_disk, src_part->disk);
String dst_part_path = getFullPathOnDisk(reservation->getDisk());
Poco::Path dst_part_absolute_path = Poco::Path(dst_part_path + tmp_dst_part_name).absolute();
Poco::Path src_part_absolute_path = Poco::Path(src_part->getFullPath()).absolute();

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/SimpleIncrement.h>
#include <Common/DiskSpaceMonitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Storages/IStorage.h>
@ -9,6 +10,7 @@
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/PartDestinationType.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
@ -19,7 +21,6 @@
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Interpreters/PartLog.h>
#include <Common/DiskSpaceMonitor.h>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
@ -565,7 +566,7 @@ public:
/// All MergeTreeData children have settings.
void checkSettingCanBeChanged(const String & setting_name) const override;
/// Remove columns, that have been markedd as empty after zeroing values with expired ttl
/// Remove columns, that have been marked as empty after zeroing values with expired ttl
void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part);
/// Freezes all parts.
@ -587,7 +588,7 @@ public:
bool hasPrimaryKey() const { return !primary_key_columns.empty(); }
bool hasSkipIndices() const { return !skip_indices.empty(); }
bool hasTableTTL() const { return ttl_table_ast != nullptr; }
bool hasAnyColumnTTL() const { return !ttl_entries_by_name.empty(); }
bool hasAnyColumnTTL() const { return !column_ttl_entries_by_name.empty(); }
/// Check that the part is not broken and calculate the checksums for it if they are not present.
MutableDataPartPtr loadPartAndFixMetadata(const DiskSpace::DiskPtr & disk, const String & relative_path);
@ -673,9 +674,20 @@ public:
using PathsWithDisks = std::vector<PathWithDisk>;
PathsWithDisks getDataPathsWithDisks() const;
/// Reserves space at least 1MB
DiskSpace::ReservationPtr reserveSpace(UInt64 expected_size);
/// Reserves space at least 1MB.
DiskSpace::ReservationPtr reserveSpace(UInt64 expected_size) const;
/// Reserves space at least 1MB on specific disk or volume.
DiskSpace::ReservationPtr reserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const;
DiskSpace::ReservationPtr tryReserveSpace(UInt64 expected_size, DiskSpace::SpacePtr space) const;
/// Reserves space at least 1MB preferring best destination according to `ttl_infos`.
DiskSpace::ReservationPtr reserveSpacePreferringTTLRules(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move) const;
DiskSpace::ReservationPtr tryReserveSpacePreferringTTLRules(UInt64 expected_size,
const MergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move) const;
/// Choose disk with max available free space
/// Reserves 0 bytes
DiskSpace::ReservationPtr makeEmptyReservationOnLargestDisk() { return storage_policy->makeEmptyReservationOnLargestDisk(); }
@ -719,12 +731,27 @@ public:
{
ExpressionActionsPtr expression;
String result_column;
/// Name and type of a destination are only valid in table-level context.
PartDestinationType destination_type;
String destination_name;
ASTPtr entry_ast;
/// Returns destination disk or volume for this rule.
DiskSpace::SpacePtr getDestination(const DiskSpace::StoragePolicyPtr & policy) const;
/// Checks if given part already belongs destination disk or volume for this rule.
bool isPartInDestination(const DiskSpace::StoragePolicyPtr & policy, const MergeTreeDataPart & part) const;
};
const TTLEntry * selectTTLEntryForTTLInfos(const MergeTreeDataPart::TTLInfos & ttl_infos, time_t time_of_move) const;
using TTLEntriesByName = std::unordered_map<String, TTLEntry>;
TTLEntriesByName ttl_entries_by_name;
TTLEntriesByName column_ttl_entries_by_name;
TTLEntry ttl_table_entry;
std::vector<TTLEntry> move_ttl_entries;
String sampling_expr_column_name;
Names columns_required_for_sampling;

View File

@ -1,6 +1,7 @@
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/quoteString.h>
#include <common/JSON.h>
@ -15,10 +16,16 @@ void MergeTreeDataPartTTLInfos::update(const MergeTreeDataPartTTLInfos & other_i
updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
}
for (const auto & [expression, ttl_info] : other_infos.moves_ttl)
{
moves_ttl[expression].update(ttl_info);
}
table_ttl.update(other_infos.table_ttl);
updatePartMinMaxTTL(table_ttl.min, table_ttl.max);
}
void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
{
String json_str;
@ -28,7 +35,7 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
JSON json(json_str);
if (json.has("columns"))
{
JSON columns = json["columns"];
const JSON & columns = json["columns"];
for (auto col : columns)
{
MergeTreeDataPartTTLInfo ttl_info;
@ -42,14 +49,27 @@ void MergeTreeDataPartTTLInfos::read(ReadBuffer & in)
}
if (json.has("table"))
{
JSON table = json["table"];
const JSON & table = json["table"];
table_ttl.min = table["min"].getUInt();
table_ttl.max = table["max"].getUInt();
updatePartMinMaxTTL(table_ttl.min, table_ttl.max);
}
if (json.has("moves"))
{
const JSON & moves = json["moves"];
for (auto move : moves)
{
MergeTreeDataPartTTLInfo ttl_info;
ttl_info.min = move["min"].getUInt();
ttl_info.max = move["max"].getUInt();
String expression = move["expression"].getString();
moves_ttl.emplace(expression, ttl_info);
}
}
}
void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
{
writeString("ttl format version: 1\n", out);
@ -62,9 +82,9 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
if (it != columns_ttl.begin())
writeString(",", out);
writeString("{\"name\":\"", out);
writeString(it->first, out);
writeString("\",\"min\":", out);
writeString("{\"name\":", out);
writeString(doubleQuoteString(it->first), out);
writeString(",\"min\":", out);
writeIntText(it->second.min, out);
writeString(",\"max\":", out);
writeIntText(it->second.max, out);
@ -82,6 +102,26 @@ void MergeTreeDataPartTTLInfos::write(WriteBuffer & out) const
writeIntText(table_ttl.max, out);
writeString("}", out);
}
if (!moves_ttl.empty())
{
if (!columns_ttl.empty() || table_ttl.min)
writeString(",", out);
writeString("\"moves\":[", out);
for (auto it = moves_ttl.begin(); it != moves_ttl.end(); ++it)
{
if (it != moves_ttl.begin())
writeString(",", out);
writeString("{\"expression\":", out);
writeString(doubleQuoteString(it->first), out);
writeString(",\"min\":", out);
writeIntText(it->second.min, out);
writeString(",\"max\":", out);
writeIntText(it->second.max, out);
writeString("}", out);
}
writeString("]", out);
}
writeString("}", out);
}

View File

@ -35,9 +35,14 @@ struct MergeTreeDataPartTTLInfos
{
std::unordered_map<String, MergeTreeDataPartTTLInfo> columns_ttl;
MergeTreeDataPartTTLInfo table_ttl;
/// `part_min_ttl` and `part_max_ttl` are TTLs which are used for selecting parts
/// to merge in order to remove expired rows.
time_t part_min_ttl = 0;
time_t part_max_ttl = 0;
std::unordered_map<String, MergeTreeDataPartTTLInfo> moves_ttl;
void read(ReadBuffer & in);
void write(WriteBuffer & out) const;
void update(const MergeTreeDataPartTTLInfos & other_infos);
@ -50,6 +55,11 @@ struct MergeTreeDataPartTTLInfos
if (time_max && (!part_max_ttl || time_max > part_max_ttl))
part_max_ttl = time_max;
}
bool empty()
{
return !part_min_ttl && moves_ttl.empty();
}
};
}

View File

@ -75,12 +75,17 @@ void buildScatterSelector(
}
/// Computes ttls and updates ttl infos
void updateTTL(const MergeTreeData::TTLEntry & ttl_entry, MergeTreeDataPart::TTLInfos & ttl_infos, Block & block, const String & column_name)
void updateTTL(const MergeTreeData::TTLEntry & ttl_entry,
MergeTreeDataPart::TTLInfos & ttl_infos,
DB::MergeTreeDataPartTTLInfo & ttl_info,
Block & block, bool update_part_min_max_ttls)
{
bool remove_column = false;
if (!block.has(ttl_entry.result_column))
{
ttl_entry.expression->execute(block);
auto & ttl_info = (column_name.empty() ? ttl_infos.table_ttl : ttl_infos.columns_ttl[column_name]);
remove_column = true;
}
const auto & current = block.getByName(ttl_entry.result_column);
@ -113,7 +118,11 @@ void updateTTL(const MergeTreeData::TTLEntry & ttl_entry, MergeTreeDataPart::TTL
else
throw Exception("Unexpected type of result TTL column", ErrorCodes::LOGICAL_ERROR);
ttl_infos.updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
if (update_part_min_max_ttls)
ttl_infos.updatePartMinMaxTTL(ttl_info.min, ttl_info.max);
if (remove_column)
block.erase(ttl_entry.result_column);
}
}
@ -212,10 +221,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
else
part_name = new_part_info.getPartName();
/// Size of part would not be grater than block.bytes() + epsilon
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
auto reservation = data.reserveSpace(expected_size);
DB::MergeTreeDataPart::TTLInfos move_ttl_infos;
for (const auto & ttl_entry : data.move_ttl_entries)
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
DiskSpace::ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr));
MergeTreeData::MutableDataPartPtr new_data_part =
std::make_shared<MergeTreeData::DataPart>(data, reservation->getDisk(), part_name, new_part_info);
@ -251,7 +264,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
ProfileEvents::increment(ProfileEvents::MergeTreeDataWriterBlocks);
/// Sort.
/// Sort
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (!sort_description.empty())
@ -266,10 +279,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
}
if (data.hasTableTTL())
updateTTL(data.ttl_table_entry, new_data_part->ttl_infos, block, "");
updateTTL(data.ttl_table_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.table_ttl, block, true);
for (const auto & [name, ttl_entry] : data.ttl_entries_by_name)
updateTTL(ttl_entry, new_data_part->ttl_infos, block, name);
for (const auto & [name, ttl_entry] : data.column_ttl_entries_by_name)
updateTTL(ttl_entry, new_data_part->ttl_infos, new_data_part->ttl_infos.columns_ttl[name], block, true);
new_data_part->ttl_infos.update(move_ttl_infos);
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.

View File

@ -52,11 +52,14 @@ public:
elems.emplace(part);
current_size_sum += part->bytes_on_disk;
while (!elems.empty() && (current_size_sum - (*elems.begin())->bytes_on_disk >= required_size_sum))
{
current_size_sum -= (*elems.begin())->bytes_on_disk;
elems.erase(elems.begin());
}
removeRedundantElements();
}
/// Weaken requirements on size
void decreaseRequiredSizeAndRemoveRedundantParts(UInt64 size_decrease)
{
required_size_sum -= std::min(size_decrease, required_size_sum);
removeRedundantElements();
}
/// Returns parts ordered by size
@ -67,6 +70,16 @@ public:
res.push_back(elem);
return res;
}
private:
void removeRedundantElements()
{
while (!elems.empty() && (current_size_sum - (*elems.begin())->bytes_on_disk >= required_size_sum))
{
current_size_sum -= (*elems.begin())->bytes_on_disk;
elems.erase(elems.begin());
}
}
};
}
@ -85,46 +98,70 @@ bool MergeTreePartsMover::selectPartsForMove(
const auto & policy = data->getStoragePolicy();
const auto & volumes = policy->getVolumes();
/// Do not check if policy has one volume
if (volumes.size() == 1)
return false;
/// Do not check last volume
for (size_t i = 0; i != volumes.size() - 1; ++i)
if (volumes.size() > 0)
{
for (const auto & disk : volumes[i]->disks)
/// Do not check last volume
for (size_t i = 0; i != volumes.size() - 1; ++i)
{
UInt64 required_available_space = disk->getTotalSpace() * policy->getMoveFactor();
UInt64 unreserved_space = disk->getUnreservedSpace();
for (const auto & disk : volumes[i]->disks)
{
UInt64 required_maximum_available_space = disk->getTotalSpace() * policy->getMoveFactor();
UInt64 unreserved_space = disk->getUnreservedSpace();
if (required_available_space > unreserved_space)
need_to_move.emplace(disk, required_available_space - unreserved_space);
if (unreserved_space < required_maximum_available_space)
need_to_move.emplace(disk, required_maximum_available_space - unreserved_space);
}
}
}
time_t time_of_move = time(nullptr);
for (const auto & part : data_parts)
{
String reason;
/// Don't report message to log, because logging is excessive
/// Don't report message to log, because logging is excessive.
if (!can_move(part, &reason))
continue;
const MergeTreeData::TTLEntry * ttl_entry_ptr = part->storage.selectTTLEntryForTTLInfos(part->ttl_infos, time_of_move);
auto to_insert = need_to_move.find(part->disk);
if (to_insert != need_to_move.end())
to_insert->second.add(part);
DiskSpace::ReservationPtr reservation;
if (ttl_entry_ptr)
{
auto destination = ttl_entry_ptr->getDestination(policy);
if (destination && !ttl_entry_ptr->isPartInDestination(policy, *part))
reservation = part->storage.tryReserveSpace(part->bytes_on_disk, ttl_entry_ptr->getDestination(policy));
}
if (reservation) /// Found reservation by TTL rule.
{
parts_to_move.emplace_back(part, std::move(reservation));
/// If table TTL rule satisfies on this part, won't apply policy rules on it.
/// In order to not over-move, we need to "release" required space on this disk,
/// possibly to zero.
if (to_insert != need_to_move.end())
{
to_insert->second.decreaseRequiredSizeAndRemoveRedundantParts(part->bytes_on_disk);
}
}
else
{
if (to_insert != need_to_move.end())
to_insert->second.add(part);
}
}
for (auto && move : need_to_move)
{
auto min_volume_priority = policy->getVolumeIndexByDisk(move.first) + 1;
auto min_volume_index = policy->getVolumeIndexByDisk(move.first) + 1;
for (auto && part : move.second.getAccumulatedParts())
{
auto reservation = policy->reserve(part->bytes_on_disk, min_volume_priority);
auto reservation = policy->reserve(part->bytes_on_disk, min_volume_index);
if (!reservation)
{
/// Next parts to move from this disk has greater size and same min volume priority
/// There are no space for them
/// But it can be possible to move data from other disks
/// Next parts to move from this disk has greater size and same min volume index.
/// There are no space for them.
/// But it can be possible to move data from other disks.
break;
}
parts_to_move.emplace_back(part, std::move(reservation));

View File

@ -186,7 +186,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
}
if (new_part->ttl_infos.part_min_ttl)
if (!new_part->ttl_infos.empty())
{
/// Write a file with ttl infos in json format.
WriteBufferFromFile out(part_path + "ttl.txt", 4096);

View File

@ -0,0 +1,14 @@
#pragma once
namespace DB
{
enum class PartDestinationType
{
DISK,
VOLUME,
DELETE,
};
}

View File

@ -5,6 +5,7 @@
#include <Parsers/ExpressionListParsers.h>
#include <IO/Operators.h>
namespace DB
{
@ -47,6 +48,16 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
partition_key = formattedAST(MergeTreeData::extractKeyExpressionList(data.partition_by_ast));
ttl_table = formattedAST(data.ttl_table_ast);
std::ostringstream ttl_move_stream;
for (const auto & ttl_entry : data.move_ttl_entries)
{
if (ttl_move_stream.tellp() > 0)
ttl_move_stream << ", ";
ttl_move_stream << formattedAST(ttl_entry.entry_ast);
}
ttl_move = ttl_move_stream.str();
skip_indices = data.getIndices().toString();
if (data.canUseAdaptiveGranularity())
index_granularity_bytes = data_settings->index_granularity_bytes;
@ -78,6 +89,9 @@ void ReplicatedMergeTreeTableMetadata::write(WriteBuffer & out) const
if (!ttl_table.empty())
out << "ttl: " << ttl_table << "\n";
if (!ttl_move.empty())
out << "move ttl: " << ttl_move << "\n";
if (!skip_indices.empty())
out << "indices: " << skip_indices << "\n";
@ -119,6 +133,9 @@ void ReplicatedMergeTreeTableMetadata::read(ReadBuffer & in)
if (checkString("ttl: ", in))
in >> ttl_table >> "\n";
if (checkString("move ttl: ", in))
in >> ttl_move >> "\n";
if (checkString("indices: ", in))
in >> skip_indices >> "\n";
@ -223,12 +240,27 @@ ReplicatedMergeTreeTableMetadata::checkAndFindDiff(const ReplicatedMergeTreeTabl
}
else
throw Exception(
"Existing table metadata in ZooKeeper differs in ttl."
"Existing table metadata in ZooKeeper differs in TTL."
" Stored in ZooKeeper: " + from_zk.ttl_table +
", local: " + ttl_table,
ErrorCodes::METADATA_MISMATCH);
}
if (ttl_move != from_zk.ttl_move)
{
if (allow_alter)
{
diff.ttl_move_changed = true;
diff.new_ttl_move = from_zk.ttl_move;
}
else
throw Exception(
"Existing table metadata in ZooKeeper differs in move TTL."
" Stored in ZooKeeper: " + from_zk.ttl_move +
", local: " + ttl_move,
ErrorCodes::METADATA_MISMATCH);
}
if (skip_indices != from_zk.skip_indices)
{
if (allow_alter)

View File

@ -28,6 +28,7 @@ struct ReplicatedMergeTreeTableMetadata
String skip_indices;
String constraints;
String ttl_table;
String ttl_move;
UInt64 index_granularity_bytes;
ReplicatedMergeTreeTableMetadata() = default;
@ -53,9 +54,12 @@ struct ReplicatedMergeTreeTableMetadata
bool ttl_table_changed = false;
String new_ttl_table;
bool ttl_move_changed = false;
String new_ttl_move;
bool empty() const
{
return !sorting_key_changed && !skip_indices_changed && !ttl_table_changed && !constraints_changed;
return !sorting_key_changed && !skip_indices_changed && !ttl_table_changed && !constraints_changed && !ttl_move_changed;
}
};

View File

@ -1,5 +1,6 @@
#include <Storages/PartitionCommands.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/PartDestinationType.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTIdentifier.h>
@ -47,12 +48,14 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
res.part = command_ast->part;
switch (command_ast->move_destination_type)
{
case ASTAlterCommand::MoveDestinationType::DISK:
case PartDestinationType::DISK:
res.move_destination_type = PartitionCommand::MoveDestinationType::DISK;
break;
case ASTAlterCommand::MoveDestinationType::VOLUME:
case PartDestinationType::VOLUME:
res.move_destination_type = PartitionCommand::MoveDestinationType::VOLUME;
break;
default:
break;
}
res.move_destination_name = command_ast->move_destination_name;
return res;

View File

@ -350,9 +350,15 @@ public:
/// if we mutate part, than we should reserve space on the same disk, because mutations possible can create hardlinks
if (is_mutation)
reserved_space = future_part_.parts[0]->disk->reserve(total_size);
reserved_space = storage.tryReserveSpace(total_size, future_part_.parts[0]->disk);
else
reserved_space = storage.reserveSpace(total_size);
{
MergeTreeDataPart::TTLInfos ttl_infos;
for (auto & part_ptr : future_part_.parts)
ttl_infos.update(part_ptr->ttl_infos);
reserved_space = storage.tryReserveSpacePreferringTTLRules(total_size, ttl_infos, time(nullptr));
}
if (!reserved_space)
{
if (is_mutation)

View File

@ -1005,8 +1005,14 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
/// Start to make the main work
size_t estimated_space_for_merge = MergeTreeDataMergerMutator::estimateNeededDiskSpace(parts);
/// Can throw an exception.
DiskSpace::ReservationPtr reserved_space = reserveSpace(estimated_space_for_merge);
/// Can throw an exception while reserving space.
MergeTreeDataPart::TTLInfos ttl_infos;
for (auto & part_ptr : parts)
{
ttl_infos.update(part_ptr->ttl_infos);
}
DiskSpace::ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
ttl_infos, time(nullptr));
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);
@ -1139,14 +1145,9 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
entry.new_part_name, format_version);
MutationCommands commands = queue.getMutationCommands(source_part, new_part_info.mutation);
/// Can throw an exception.
/// Once we mutate part, we must reserve space on the same disk, because mutations can possibly create hardlinks.
DiskSpace::ReservationPtr reserved_space = source_part->disk->reserve(estimated_space_for_result);
if (!reserved_space)
{
throw Exception("Cannot reserve " + formatReadableSizeWithBinarySuffix(estimated_space_for_result) + ", not enough space",
ErrorCodes::NOT_ENOUGH_SPACE);
}
/// Can throw an exception.
DiskSpace::ReservationPtr reserved_space = reserveSpace(estimated_space_for_result, source_part->disk);
auto table_lock = lockStructureForShare(false, RWLockImpl::NO_QUERY);

View File

@ -0,0 +1,16 @@
<yandex>
<remote_servers>
<test_cluster>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
</test_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,4 @@
<yandex>
<background_move_processing_pool_thread_sleep_seconds>0.5</background_move_processing_pool_thread_sleep_seconds>
<background_move_processing_pool_task_sleep_seconds_when_no_work_max>0.5</background_move_processing_pool_task_sleep_seconds_when_no_work_max>
</yandex>

View File

@ -0,0 +1,76 @@
<yandex>
<storage_configuration>
<disks>
<default>
</default>
<jbod1>
<path>/jbod1/</path>
</jbod1>
<jbod2>
<path>/jbod2/</path>
</jbod2>
<external>
<path>/external/</path>
</external>
</disks>
<policies>
<external_with_jbods>
<volumes>
<external>
<disk>external</disk>
</external>
<main>
<disk>jbod1</disk>
<disk>jbod2</disk>
</main>
</volumes>
</external_with_jbods>
<jbods_with_external>
<volumes>
<main>
<disk>jbod1</disk>
<disk>jbod2</disk>
</main>
<external>
<disk>external</disk>
</external>
</volumes>
</jbods_with_external>
<small_jbod_with_external>
<volumes>
<main>
<disk>jbod1</disk>
</main>
<external>
<disk>external</disk>
</external>
</volumes>
</small_jbod_with_external>
<jbod1_with_jbod2>
<volumes>
<main>
<disk>jbod1</disk>
</main>
<external>
<disk>jbod2</disk>
</external>
</volumes>
</jbod1_with_jbod2>
<only_jbod2>
<volumes>
<main>
<disk>jbod2</disk>
</main>
</volumes>
</only_jbod2>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,17 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
<part_log>
<database>system</database>
<table>part_log</table>
<flush_interval_milliseconds>500</flush_interval_milliseconds>
</part_log>
</yandex>

View File

@ -0,0 +1,442 @@
import json
import pytest
import random
import re
import string
import threading
import time
from multiprocessing.dummy import Pool
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1',
config_dir='configs',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/external:size=200M'],
macros={"shard": 0, "replica": 1} )
node2 = cluster.add_instance('node2',
config_dir='configs',
main_configs=['configs/logs_config.xml'],
with_zookeeper=True,
tmpfs=['/jbod1:size=40M', '/jbod2:size=40M', '/external:size=200M'],
macros={"shard": 0, "replica": 2} )
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def get_random_string(length):
symbols = bytes(string.ascii_uppercase + string.digits)
result_list = bytearray([0])*length
for i in range(length):
result_list[i] = random.choice(symbols)
return str(result_list)
def get_used_disks_for_table(node, table_name):
return node.query("select disk_name from system.parts where table == '{}' and active=1 order by modification_time".format(table_name)).strip().split('\n')
@pytest.mark.parametrize("name,engine,positive", [
("mt_test_inserts_to_disk_do_not_work","MergeTree()",0),
("replicated_mt_test_inserts_to_disk_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_disk_do_not_work', '1')",0),
("mt_test_inserts_to_disk_work","MergeTree()",1),
("replicated_mt_test_inserts_to_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_disk_work', '1')",1),
])
def test_inserts_to_disk_work(started_cluster, name, engine, positive):
try:
node1.query("""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 TO DISK 'external'
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name, engine=engine))
data = [] # 10MB in total
for i in range(10):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-1 if i > 0 or positive else time.time()+300))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external" if positive else "jbod1"}
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "10"
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
@pytest.mark.parametrize("name,engine,positive", [
("mt_test_moves_to_disk_do_not_work","MergeTree()",0),
("replicated_mt_test_moves_to_disk_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_do_not_work', '1')",0),
("mt_test_moves_to_disk_work","MergeTree()",1),
("replicated_mt_test_moves_to_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_work', '1')",1),
])
def test_moves_to_disk_work(started_cluster, name, engine, positive):
try:
node1.query("""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 TO DISK 'external'
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name, engine=engine))
wait_expire_1 = 6
wait_expire_2 = 4
time_1 = time.time() + wait_expire_1
time_2 = time.time() + wait_expire_1 + wait_expire_2
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
data = [] # 10MB in total
for i in range(10):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1 if i > 0 or positive else time_2))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
wait_expire_1_thread.join()
time.sleep(wait_expire_2/2)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external" if positive else "jbod1"}
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "10"
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
@pytest.mark.parametrize("name,engine", [
("mt_test_moves_to_volume_work","MergeTree()"),
("replicated_mt_test_moves_to_volume_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_volume_work', '1')"),
])
def test_moves_to_volume_work(started_cluster, name, engine):
try:
node1.query("""
CREATE TABLE {name} (
p1 Int64,
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY p1
TTL d1 TO VOLUME 'external'
SETTINGS storage_policy='jbods_with_external'
""".format(name=name, engine=engine))
wait_expire_1 = 10
time_1 = time.time() + wait_expire_1
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
for p in range(2):
data = [] # 10MB in total
for i in range(5):
data.append((str(p), "'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1))) # 1MB row
node1.query("INSERT INTO {} (p1, s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {'jbod1', 'jbod2'}
wait_expire_1_thread.join()
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external"}
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "10"
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
@pytest.mark.parametrize("name,engine,positive", [
("mt_test_inserts_to_volume_do_not_work","MergeTree()",0),
("replicated_mt_test_inserts_to_volume_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_volume_do_not_work', '1')",0),
("mt_test_inserts_to_volume_work","MergeTree()",1),
("replicated_mt_test_inserts_to_volume_work","ReplicatedMergeTree('/clickhouse/replicated_test_inserts_to_volume_work', '1')",1),
])
def test_inserts_to_volume_work(started_cluster, name, engine, positive):
try:
node1.query("""
CREATE TABLE {name} (
p1 Int64,
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
PARTITION BY p1
TTL d1 TO VOLUME 'external'
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name, engine=engine))
node1.query("SYSTEM STOP MOVES {name}".format(name=name))
for p in range(2):
data = [] # 20MB in total
for i in range(10):
data.append((str(p), "'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-1 if i > 0 or positive else time.time()+300))) # 1MB row
node1.query("INSERT INTO {} (p1, s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external" if positive else "jbod1"}
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "20"
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
@pytest.mark.parametrize("name,engine", [
("mt_test_moves_to_disk_eventually_work","MergeTree()"),
("replicated_mt_test_moves_to_disk_eventually_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_to_disk_eventually_work', '1')"),
])
def test_moves_to_disk_eventually_work(started_cluster, name, engine):
try:
name_temp = name + "_temp"
node1.query("""
CREATE TABLE {name} (
s1 String
) ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS storage_policy='only_jbod2'
""".format(name=name_temp))
data = [] # 35MB in total
for i in range(35):
data.append(get_random_string(1024 * 1024)) # 1MB row
node1.query("INSERT INTO {} VALUES {}".format(name_temp, ",".join(["('" + x + "')" for x in data])))
used_disks = get_used_disks_for_table(node1, name_temp)
assert set(used_disks) == {"jbod2"}
node1.query("""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 TO DISK 'jbod2'
SETTINGS storage_policy='jbod1_with_jbod2'
""".format(name=name, engine=engine))
data = [] # 10MB in total
for i in range(10):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time.time()-1))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
node1.query("DROP TABLE {}".format(name_temp))
time.sleep(2)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod2"}
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "10"
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name_temp))
node1.query("DROP TABLE IF EXISTS {}".format(name))
@pytest.mark.parametrize("name,engine,positive", [
("mt_test_merges_to_disk_do_not_work","MergeTree()",0),
("replicated_mt_test_merges_to_disk_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_merges_to_disk_do_not_work', '1')",0),
("mt_test_merges_to_disk_work","MergeTree()",1),
("replicated_mt_test_merges_to_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_merges_to_disk_work', '1')",1),
])
def test_merges_to_disk_work(started_cluster, name, engine, positive):
try:
node1.query("""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 TO DISK 'external'
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name, engine=engine))
node1.query("SYSTEM STOP MERGES {}".format(name))
node1.query("SYSTEM STOP MOVES {}".format(name))
wait_expire_1 = 10
wait_expire_2 = 4
time_1 = time.time() + wait_expire_1
time_2 = time.time() + wait_expire_1 + wait_expire_2
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
for _ in range(2):
data = [] # 16MB in total
for i in range(8):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1 if i > 0 or positive else time_2))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
assert "2" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip()
wait_expire_1_thread.join()
time.sleep(wait_expire_2/2)
node1.query("SYSTEM START MERGES {}".format(name))
node1.query("OPTIMIZE TABLE {}".format(name))
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external" if positive else "jbod1"}
assert "1" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip()
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "16"
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))
@pytest.mark.parametrize("name,engine", [
("mt_test_merges_with_full_disk_work","MergeTree()"),
("replicated_mt_test_merges_with_full_disk_work","ReplicatedMergeTree('/clickhouse/replicated_test_merges_with_full_disk_work', '1')"),
])
def test_merges_with_full_disk_work(started_cluster, name, engine):
try:
name_temp = name + "_temp"
node1.query("""
CREATE TABLE {name} (
s1 String
) ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS storage_policy='only_jbod2'
""".format(name=name_temp))
data = [] # 35MB in total
for i in range(35):
data.append(get_random_string(1024 * 1024)) # 1MB row
node1.query("INSERT INTO {} VALUES {}".format(name_temp, ",".join(["('" + x + "')" for x in data])))
used_disks = get_used_disks_for_table(node1, name_temp)
assert set(used_disks) == {"jbod2"}
node1.query("""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 TO DISK 'jbod2'
SETTINGS storage_policy='jbod1_with_jbod2'
""".format(name=name, engine=engine))
wait_expire_1 = 10
time_1 = time.time() + wait_expire_1
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
for _ in range(2):
data = [] # 12MB in total
for i in range(6):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
assert "2" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip()
wait_expire_1_thread.join()
node1.query("OPTIMIZE TABLE {}".format(name))
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"} # Merged to the same disk against the rule.
assert "1" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip()
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "12"
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name_temp))
node1.query("DROP TABLE IF EXISTS {}".format(name))
@pytest.mark.parametrize("name,engine,positive", [
("mt_test_moves_after_merges_do_not_work","MergeTree()",0),
("replicated_mt_test_moves_after_merges_do_not_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_do_not_work', '1')",0),
("mt_test_moves_after_merges_work","MergeTree()",1),
("replicated_mt_test_moves_after_merges_work","ReplicatedMergeTree('/clickhouse/replicated_test_moves_after_merges_work', '1')",1),
])
def test_moves_after_merges_work(started_cluster, name, engine, positive):
try:
node1.query("""
CREATE TABLE {name} (
s1 String,
d1 DateTime
) ENGINE = {engine}
ORDER BY tuple()
TTL d1 TO DISK 'external'
SETTINGS storage_policy='small_jbod_with_external'
""".format(name=name, engine=engine))
wait_expire_1 = 10
wait_expire_2 = 4
time_1 = time.time() + wait_expire_1
time_2 = time.time() + wait_expire_1 + wait_expire_2
wait_expire_1_thread = threading.Thread(target=time.sleep, args=(wait_expire_1,))
wait_expire_1_thread.start()
for _ in range(2):
data = [] # 16MB in total
for i in range(8):
data.append(("'{}'".format(get_random_string(1024 * 1024)), "toDateTime({})".format(time_1 if i > 0 or positive else time_2))) # 1MB row
node1.query("INSERT INTO {} (s1, d1) VALUES {}".format(name, ",".join(["(" + ",".join(x) + ")" for x in data])))
node1.query("OPTIMIZE TABLE {}".format(name))
time.sleep(1)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"jbod1"}
assert "1" == node1.query("SELECT count() FROM system.parts WHERE table = '{}' AND active = 1".format(name)).strip()
wait_expire_1_thread.join()
time.sleep(wait_expire_2/2)
used_disks = get_used_disks_for_table(node1, name)
assert set(used_disks) == {"external" if positive else "jbod1"}
assert node1.query("SELECT count() FROM {name}".format(name=name)).strip() == "16"
finally:
node1.query("DROP TABLE IF EXISTS {}".format(name))

View File

@ -1,6 +1,7 @@
import time
import pytest
import helpers.client as client
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
@ -9,7 +10,7 @@ node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True)
@pytest.fixture(scope="module")
def start_cluster():
def started_cluster():
try:
cluster.start()
@ -25,7 +26,7 @@ def drop_table(nodes, table_name):
for node in nodes:
node.query("DROP TABLE IF EXISTS {}".format(table_name))
def test_ttl_columns(start_cluster):
def test_ttl_columns(started_cluster):
drop_table([node1, node2], "test_ttl")
for node in [node1, node2]:
node.query(
@ -43,8 +44,12 @@ def test_ttl_columns(start_cluster):
expected = "1\t0\t0\n2\t0\t0\n"
assert TSV(node1.query("SELECT id, a, b FROM test_ttl ORDER BY id")) == TSV(expected)
assert TSV(node2.query("SELECT id, a, b FROM test_ttl ORDER BY id")) == TSV(expected)
def test_ttl_table(start_cluster):
@pytest.mark.parametrize("delete_suffix", [
"",
"DELETE",
])
def test_ttl_table(started_cluster, delete_suffix):
drop_table([node1, node2], "test_ttl")
for node in [node1, node2]:
node.query(
@ -52,8 +57,8 @@ def test_ttl_table(start_cluster):
CREATE TABLE test_ttl(date DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 1 DAY SETTINGS merge_with_ttl_timeout=0;
'''.format(replica=node.name))
TTL date + INTERVAL 1 DAY {delete_suffix} SETTINGS merge_with_ttl_timeout=0;
'''.format(replica=node.name, delete_suffix=delete_suffix))
node1.query("INSERT INTO test_ttl VALUES (toDateTime('2000-10-10 00:00:00'), 1)")
node1.query("INSERT INTO test_ttl VALUES (toDateTime('2000-10-11 10:00:00'), 2)")
@ -62,4 +67,18 @@ def test_ttl_table(start_cluster):
assert TSV(node1.query("SELECT * FROM test_ttl")) == TSV("")
assert TSV(node2.query("SELECT * FROM test_ttl")) == TSV("")
def test_ttl_double_delete_rule_returns_error(started_cluster):
drop_table([node1, node2], "test_ttl")
try:
node1.query('''
CREATE TABLE test_ttl(date DateTime, id UInt32)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/test_ttl', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date)
TTL date + INTERVAL 1 DAY, date + INTERVAL 2 DAY SETTINGS merge_with_ttl_timeout=0;
'''.format(replica=node1.name))
assert False
except client.QueryRuntimeException:
pass
except:
assert False

View File

@ -645,6 +645,10 @@ Maxim Fedotov, Wargaming + Yuri Baranov, Яндекс.
При парсинге запроса преобразовывать синтаксис вида `@@version_full` в вызов функции `getGlobalVariable('version_full')`. Поддержать популярные MySQL переменные. Может быть поможет Юрий Баранов, если будет энтузиазм.
### 8.23. Подписка для импорта обновляемых и ротируемых логов в ФС.
Желательно 2.15.
## 9. Безопасность.