Merge remote-tracking branch 'origin/master' into omnisci-benchmark

This commit is contained in:
Alexey Milovidov 2020-07-29 02:29:58 +03:00
commit 529bc14cd3
36 changed files with 890 additions and 212 deletions

View File

@ -4,6 +4,8 @@ set -ex
# Use the packaged repository to find the revision we will compare to. # Use the packaged repository to find the revision we will compare to.
function find_reference_sha function find_reference_sha
{ {
git -C right/ch log -1 origin/master
git -C right/ch log -1 pr
# Go back from the revision to be tested, trying to find the closest published # Go back from the revision to be tested, trying to find the closest published
# testing release. The PR branch may be either pull/*/head which is the # testing release. The PR branch may be either pull/*/head which is the
# author's branch, or pull/*/merge, which is head merged with some master # author's branch, or pull/*/merge, which is head merged with some master

View File

@ -7,7 +7,7 @@ toc_title: Yandex.Metrica Data
Dataset consists of two tables containing anonymized data about hits (`hits_v1`) and visits (`visits_v1`) of Yandex.Metrica. You can read more about Yandex.Metrica in [ClickHouse history](../../introduction/history.md) section. Dataset consists of two tables containing anonymized data about hits (`hits_v1`) and visits (`visits_v1`) of Yandex.Metrica. You can read more about Yandex.Metrica in [ClickHouse history](../../introduction/history.md) section.
The dataset consists of two tables, either of them can be downloaded as a compressed `tsv.xz` file or as prepared partitions. In addition to that, an extended version of the `hits` table containing 100 million rows is available as TSV at https://clickhouse-datasets.s3.yandex.net/hits/tsv/hits\_100m\_obfuscated\_v1.tsv.xz and as prepared partitions at https://clickhouse-datasets.s3.yandex.net/hits/partitions/hits\_100m\_obfuscated\_v1.tar.xz. The dataset consists of two tables, either of them can be downloaded as a compressed `tsv.xz` file or as prepared partitions. In addition to that, an extended version of the `hits` table containing 100 million rows is available as TSV at https://clickhouse-datasets.s3.yandex.net/hits/tsv/hits_100m_obfuscated_v1.tsv.xz and as prepared partitions at https://clickhouse-datasets.s3.yandex.net/hits/partitions/hits_100m_obfuscated_v1.tar.xz.
## Obtaining Tables from Prepared Partitions {#obtaining-tables-from-prepared-partitions} ## Obtaining Tables from Prepared Partitions {#obtaining-tables-from-prepared-partitions}

View File

@ -1459,6 +1459,20 @@ Possible values:
Default value: 16. Default value: 16.
## parallel_distributed_insert_select {#parallel_distributed_insert_select}
Enables parallel distributed `INSERT ... SELECT` query.
If we execute `INSERT INTO distributed_table_a SELECT ... FROM distributed_table_b` queries and both tables use the same cluster, and both tables are either [replicated](../../engines/table-engines/mergetree-family/replication.md) or non-replicated, then this query is processed locally on every shard.
Possible values:
- 0 — Disabled.
- 1 — Enabled.
Default value: 0.
## insert_distributed_sync {#insert_distributed_sync} ## insert_distributed_sync {#insert_distributed_sync}
Enables or disables synchronous data insertion into a [Distributed](../../engines/table-engines/special/distributed.md#distributed) table. Enables or disables synchronous data insertion into a [Distributed](../../engines/table-engines/special/distributed.md#distributed) table.

View File

@ -50,6 +50,6 @@ CurrentMetric_ReplicatedChecks: 0
**See also** **See also**
- [system.asynchronous\_metrics](../../operations/system-tables/asynchronous_metrics.md) — Contains periodically calculated metrics. - [system.asynchronous\_metrics](../../operations/system-tables/asynchronous_metrics.md) — Contains periodically calculated metrics.
- [system.events](../../operations/system-tables/events.md) — Contains a number of events that occurred. - [system.events](../../operations/system-tables/events.md#system_tables-events) — Contains a number of events that occurred.
- [system.metrics](../../operations/system-tables/metrics.md) — Contains instantly calculated metrics. - [system.metrics](../../operations/system-tables/metrics.md) — Contains instantly calculated metrics.
- [Monitoring](../../operations/monitoring.md) — Base concepts of ClickHouse monitoring. - [Monitoring](../../operations/monitoring.md) — Base concepts of ClickHouse monitoring.

View File

@ -1278,6 +1278,19 @@ Default value: 0.
Значение по умолчанию: 16. Значение по умолчанию: 16.
## parallel_distributed_insert_select {#parallel_distributed_insert_select}
Включает параллельную обработку распределённых запросов `INSERT ... SELECT`.
Если при выполнении запроса `INSERT INTO distributed_table_a SELECT ... FROM distributed_table_b` оказывается, что обе таблицы находятся в одном кластере, то независимо от того [реплицируемые](../../engines/table-engines/mergetree-family/replication.md) они или нет, запрос выполняется локально на каждом шарде.
Допустимые значения:
- 0 — выключена.
- 1 — включена.
Значение по умолчанию: 0.
## insert_distributed_sync {#insert_distributed_sync} ## insert_distributed_sync {#insert_distributed_sync}
Включает или отключает режим синхронного добавления данных в распределенные таблицы (таблицы с движком [Distributed](../../engines/table-engines/special/distributed.md#distributed)). Включает или отключает режим синхронного добавления данных в распределенные таблицы (таблицы с движком [Distributed](../../engines/table-engines/special/distributed.md#distributed)).

View File

@ -1,6 +1,4 @@
#include <Access/IAccessStorage.h> #include <Access/IAccessStorage.h>
#include <Access/User.h>
#include <Access/Role.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <Common/quoteString.h> #include <Common/quoteString.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
@ -39,110 +37,71 @@ namespace
} }
template <typename Func, typename ResultType = std::result_of_t<Func()>> template <typename Func>
ResultType doTry(const Func & func) bool tryCall(const Func & function)
{ {
try try
{ {
return func(); function();
return true;
} }
catch (Exception &) catch (...)
{ {
return {}; return false;
} }
} }
template <bool ignore_errors, typename T, typename ApplyFunc, typename GetNameFunc = std::nullptr_t, class ErrorsTracker
typename ResultTypeOfApplyFunc = std::result_of_t<ApplyFunc(T)>,
typename ResultType = std::conditional_t<std::is_same_v<ResultTypeOfApplyFunc, void>, void, std::vector<ResultTypeOfApplyFunc>>>
ResultType applyToMultipleEntities(
const std::vector<T> & multiple_entities,
const ApplyFunc & apply_function,
const char * error_message_format [[maybe_unused]] = nullptr,
const GetNameFunc & get_name_function [[maybe_unused]] = nullptr)
{ {
std::optional<Exception> exception; public:
std::vector<bool> success; explicit ErrorsTracker(size_t count_) { succeed.reserve(count_); }
auto helper = [&](const auto & apply_and_store_result_function) template <typename Func>
bool tryCall(const Func & func)
{ {
for (size_t i = 0; i != multiple_entities.size(); ++i) try
{ {
try func();
{
apply_and_store_result_function(multiple_entities[i]);
if constexpr (!ignore_errors)
success[i] = true;
}
catch (Exception & e)
{
if (!ignore_errors && !exception)
exception.emplace(e);
}
catch (Poco::Exception & e)
{
if (!ignore_errors && !exception)
exception.emplace(Exception::CreateFromPocoTag{}, e);
}
catch (std::exception & e)
{
if (!ignore_errors && !exception)
exception.emplace(Exception::CreateFromSTDTag{}, e);
}
} }
}; catch (Exception & e)
if constexpr (std::is_same_v<ResultType, void>)
{
if (multiple_entities.empty())
return;
if (multiple_entities.size() == 1)
{ {
apply_function(multiple_entities.front()); if (!exception)
return; exception.emplace(e);
succeed.push_back(false);
return false;
} }
catch (Poco::Exception & e)
if constexpr (!ignore_errors)
success.resize(multiple_entities.size(), false);
helper(apply_function);
if (ignore_errors || !exception)
return;
}
else
{
ResultType result;
if (multiple_entities.empty())
return result;
if (multiple_entities.size() == 1)
{ {
result.emplace_back(apply_function(multiple_entities.front())); if (!exception)
return result; exception.emplace(Exception::CreateFromPocoTag{}, e);
succeed.push_back(false);
return false;
} }
catch (std::exception & e)
result.reserve(multiple_entities.size()); {
if constexpr (!ignore_errors) if (!exception)
success.resize(multiple_entities.size(), false); exception.emplace(Exception::CreateFromSTDTag{}, e);
succeed.push_back(false);
helper([&](const T & entity) { result.emplace_back(apply_function(entity)); }); return false;
}
if (ignore_errors || !exception) succeed.push_back(true);
return result; return true;
} }
if constexpr (!ignore_errors) bool errors() const { return exception.has_value(); }
void showErrors(const char * format, const std::function<String(size_t)> & get_name_function)
{ {
if (!exception)
return;
Strings succeeded_names_list; Strings succeeded_names_list;
Strings failed_names_list; Strings failed_names_list;
for (size_t i = 0; i != multiple_entities.size(); ++i) for (size_t i = 0; i != succeed.size(); ++i)
{ {
const auto & entity = multiple_entities[i]; String name = get_name_function(i);
String name = get_name_function(entity); if (succeed[i])
if (success[i])
succeeded_names_list.emplace_back(name); succeeded_names_list.emplace_back(name);
else else
failed_names_list.emplace_back(name); failed_names_list.emplace_back(name);
@ -152,14 +111,17 @@ namespace
if (succeeded_names.empty()) if (succeeded_names.empty())
succeeded_names = "none"; succeeded_names = "none";
String error_message = error_message_format; String error_message = format;
boost::replace_all(error_message, "{succeeded_names}", succeeded_names); boost::replace_all(error_message, "{succeeded_names}", succeeded_names);
boost::replace_all(error_message, "{failed_names}", failed_names); boost::replace_all(error_message, "{failed_names}", failed_names);
exception->addMessage(error_message); exception->addMessage(error_message);
exception->rethrow(); exception->rethrow();
} }
__builtin_unreachable();
} private:
std::vector<bool> succeed;
std::optional<Exception> exception;
};
} }
@ -216,7 +178,11 @@ bool IAccessStorage::exists(const UUID & id) const
AccessEntityPtr IAccessStorage::tryReadBase(const UUID & id) const AccessEntityPtr IAccessStorage::tryReadBase(const UUID & id) const
{ {
return doTry([&] { return readImpl(id); }); AccessEntityPtr entity;
auto func = [&] { entity = readImpl(id); };
if (!tryCall(func))
return nullptr;
return entity;
} }
@ -228,7 +194,11 @@ String IAccessStorage::readName(const UUID & id) const
std::optional<String> IAccessStorage::tryReadName(const UUID & id) const std::optional<String> IAccessStorage::tryReadName(const UUID & id) const
{ {
return doTry([&] { return std::optional<String>{readNameImpl(id)}; }); String name;
auto func = [&] { name = readNameImpl(id); };
if (!tryCall(func))
return {};
return name;
} }
@ -240,41 +210,77 @@ UUID IAccessStorage::insert(const AccessEntityPtr & entity)
std::vector<UUID> IAccessStorage::insert(const std::vector<AccessEntityPtr> & multiple_entities) std::vector<UUID> IAccessStorage::insert(const std::vector<AccessEntityPtr> & multiple_entities)
{ {
return applyToMultipleEntities</* ignore_errors = */ false>( ErrorsTracker tracker(multiple_entities.size());
multiple_entities,
[this](const AccessEntityPtr & entity) { return insertImpl(entity, /* replace_if_exists = */ false); }, std::vector<UUID> ids;
"Couldn't insert {failed_names}. Successfully inserted: {succeeded_names}", for (const auto & entity : multiple_entities)
[](const AccessEntityPtr & entity) { return entity->outputTypeAndName(); }); {
UUID id;
auto func = [&] { id = insertImpl(entity, /* replace_if_exists = */ false); };
if (tracker.tryCall(func))
ids.push_back(id);
}
if (tracker.errors())
{
auto get_name_function = [&](size_t i) { return multiple_entities[i]->outputTypeAndName(); };
tracker.showErrors("Couldn't insert {failed_names}. Successfully inserted: {succeeded_names}", get_name_function);
}
return ids;
} }
std::optional<UUID> IAccessStorage::tryInsert(const AccessEntityPtr & entity) std::optional<UUID> IAccessStorage::tryInsert(const AccessEntityPtr & entity)
{ {
return doTry([&] { return std::optional<UUID>{insertImpl(entity, false)}; }); UUID id;
auto func = [&] { id = insertImpl(entity, /* replace_if_exists = */ false); };
if (!tryCall(func))
return {};
return id;
} }
std::vector<UUID> IAccessStorage::tryInsert(const std::vector<AccessEntityPtr> & multiple_entities) std::vector<UUID> IAccessStorage::tryInsert(const std::vector<AccessEntityPtr> & multiple_entities)
{ {
return applyToMultipleEntities</* ignore_errors = */ true>( std::vector<UUID> ids;
multiple_entities, for (const auto & entity : multiple_entities)
[this](const AccessEntityPtr & entity) { return insertImpl(entity, /* replace_if_exists = */ false); }); {
UUID id;
auto func = [&] { id = insertImpl(entity, /* replace_if_exists = */ false); };
if (tryCall(func))
ids.push_back(id);
}
return ids;
} }
UUID IAccessStorage::insertOrReplace(const AccessEntityPtr & entity) UUID IAccessStorage::insertOrReplace(const AccessEntityPtr & entity)
{ {
return insertImpl(entity, true); return insertImpl(entity, /* replace_if_exists = */ true);
} }
std::vector<UUID> IAccessStorage::insertOrReplace(const std::vector<AccessEntityPtr> & multiple_entities) std::vector<UUID> IAccessStorage::insertOrReplace(const std::vector<AccessEntityPtr> & multiple_entities)
{ {
return applyToMultipleEntities</* ignore_errors = */ false>( ErrorsTracker tracker(multiple_entities.size());
multiple_entities,
[this](const AccessEntityPtr & entity) { return insertImpl(entity, /* replace_if_exists = */ true); }, std::vector<UUID> ids;
"Couldn't insert {failed_names}. Successfully inserted: {succeeded_names}", for (const auto & entity : multiple_entities)
[](const AccessEntityPtr & entity) -> String { return entity->outputTypeAndName(); }); {
UUID id;
auto func = [&] { id = insertImpl(entity, /* replace_if_exists = */ true); };
if (tracker.tryCall(func))
ids.push_back(id);
}
if (tracker.errors())
{
auto get_name_function = [&](size_t i) { return multiple_entities[i]->outputTypeAndName(); };
tracker.showErrors("Couldn't insert {failed_names}. Successfully inserted: {succeeded_names}", get_name_function);
}
return ids;
} }
@ -286,25 +292,39 @@ void IAccessStorage::remove(const UUID & id)
void IAccessStorage::remove(const std::vector<UUID> & ids) void IAccessStorage::remove(const std::vector<UUID> & ids)
{ {
applyToMultipleEntities</* ignore_errors = */ false>( ErrorsTracker tracker(ids.size());
ids,
[this](const UUID & id) { removeImpl(id); }, for (const auto & id : ids)
"Couldn't remove {failed_names}. Successfully removed: {succeeded_names}", {
[this](const UUID & id) { return outputTypeAndNameOrID(*this, id); }); auto func = [&] { removeImpl(id); };
tracker.tryCall(func);
}
if (tracker.errors())
{
auto get_name_function = [&](size_t i) { return outputTypeAndNameOrID(*this, ids[i]); };
tracker.showErrors("Couldn't remove {failed_names}. Successfully removed: {succeeded_names}", get_name_function);
}
} }
bool IAccessStorage::tryRemove(const UUID & id) bool IAccessStorage::tryRemove(const UUID & id)
{ {
return doTry([&] { removeImpl(id); return true; }); auto func = [&] { removeImpl(id); };
return tryCall(func);
} }
std::vector<UUID> IAccessStorage::tryRemove(const std::vector<UUID> & ids) std::vector<UUID> IAccessStorage::tryRemove(const std::vector<UUID> & ids)
{ {
return applyToMultipleEntities</* ignore_errors = */ true>( std::vector<UUID> removed_ids;
ids, for (const auto & id : ids)
[this](const UUID & id) { removeImpl(id); return id; }); {
auto func = [&] { removeImpl(id); };
if (tryCall(func))
removed_ids.push_back(id);
}
return removed_ids;
} }
@ -316,25 +336,39 @@ void IAccessStorage::update(const UUID & id, const UpdateFunc & update_func)
void IAccessStorage::update(const std::vector<UUID> & ids, const UpdateFunc & update_func) void IAccessStorage::update(const std::vector<UUID> & ids, const UpdateFunc & update_func)
{ {
applyToMultipleEntities</* ignore_errors = */ false>( ErrorsTracker tracker(ids.size());
ids,
[this, &update_func](const UUID & id) { updateImpl(id, update_func); }, for (const auto & id : ids)
"Couldn't update {failed_names}. Successfully updated: {succeeded_names}", {
[this](const UUID & id) { return outputTypeAndNameOrID(*this, id); }); auto func = [&] { updateImpl(id, update_func); };
tracker.tryCall(func);
}
if (tracker.errors())
{
auto get_name_function = [&](size_t i) { return outputTypeAndNameOrID(*this, ids[i]); };
tracker.showErrors("Couldn't update {failed_names}. Successfully updated: {succeeded_names}", get_name_function);
}
} }
bool IAccessStorage::tryUpdate(const UUID & id, const UpdateFunc & update_func) bool IAccessStorage::tryUpdate(const UUID & id, const UpdateFunc & update_func)
{ {
return doTry([&] { updateImpl(id, update_func); return true; }); auto func = [&] { updateImpl(id, update_func); };
return tryCall(func);
} }
std::vector<UUID> IAccessStorage::tryUpdate(const std::vector<UUID> & ids, const UpdateFunc & update_func) std::vector<UUID> IAccessStorage::tryUpdate(const std::vector<UUID> & ids, const UpdateFunc & update_func)
{ {
return applyToMultipleEntities</* ignore_errors = */ true>( std::vector<UUID> updated_ids;
ids, for (const auto & id : ids)
[this, &update_func](const UUID & id) { updateImpl(id, update_func); return id; }); {
auto func = [&] { updateImpl(id, update_func); };
if (tryCall(func))
updated_ids.push_back(id);
}
return updated_ids;
} }

View File

@ -349,7 +349,7 @@ void ColumnTuple::updatePermutation(bool reverse, size_t limit, int nan_directio
for (const auto& column : columns) for (const auto& column : columns)
{ {
column->updatePermutation(reverse, limit, nan_direction_hint, res, equal_range); column->updatePermutation(reverse, limit, nan_direction_hint, res, equal_range);
while (limit && limit <= equal_range.back().first) while (limit && !equal_range.empty() && limit <= equal_range.back().first)
equal_range.pop_back(); equal_range.pop_back();
if (equal_range.empty()) if (equal_range.empty())

View File

@ -1309,7 +1309,7 @@ void ExpressionActionsChain::finalize()
} }
} }
std::string ExpressionActionsChain::dumpChain() std::string ExpressionActionsChain::dumpChain() const
{ {
std::stringstream ss; std::stringstream ss;

View File

@ -347,7 +347,7 @@ struct ExpressionActionsChain
return steps.back(); return steps.back();
} }
std::string dumpChain(); std::string dumpChain() const;
}; };
} }

View File

@ -764,4 +764,23 @@ std::optional<SortDescription> MutationsInterpreter::getStorageSortDescriptionIf
return sort_description; return sort_description;
} }
bool MutationsInterpreter::Stage::isAffectingAllColumns(const Names & storage_columns) const
{
/// is subset
for (const auto & storage_column : storage_columns)
if (!output_columns.count(storage_column))
return false;
return true;
}
bool MutationsInterpreter::isAffectingAllColumns() const
{
auto storage_columns = metadata_snapshot->getColumns().getNamesOfPhysical();
if (stages.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutation interpreter has no stages");
return stages.back().isAffectingAllColumns(storage_columns);
}
} }

View File

@ -42,6 +42,9 @@ public:
/// Only changed columns. /// Only changed columns.
const Block & getUpdatedHeader() const; const Block & getUpdatedHeader() const;
/// Latest mutation stage affects all columns in storage
bool isAffectingAllColumns() const;
private: private:
ASTPtr prepare(bool dry_run); ASTPtr prepare(bool dry_run);
@ -86,8 +89,8 @@ private:
ASTs filters; ASTs filters;
std::unordered_map<String, ASTPtr> column_to_updated; std::unordered_map<String, ASTPtr> column_to_updated;
/// Contains columns that are changed by this stage, /// Contains columns that are changed by this stage, columns changed by
/// columns changed by the previous stages and also columns needed by the next stages. /// the previous stages and also columns needed by the next stages.
NameSet output_columns; NameSet output_columns;
std::unique_ptr<ExpressionAnalyzer> analyzer; std::unique_ptr<ExpressionAnalyzer> analyzer;
@ -97,6 +100,9 @@ private:
/// then there is (possibly) an UPDATE step, and finally a projection step. /// then there is (possibly) an UPDATE step, and finally a projection step.
ExpressionActionsChain expressions_chain; ExpressionActionsChain expressions_chain;
Names filter_column_names; Names filter_column_names;
/// Check that stage affects all storage columns
bool isAffectingAllColumns(const Names & storage_columns) const;
}; };
std::unique_ptr<Block> updated_header; std::unique_ptr<Block> updated_header;

View File

@ -654,7 +654,7 @@ void QueryPipeline::unitePipelines(
if (extremes.size() == 1) if (extremes.size() == 1)
extremes_port = extremes.back(); extremes_port = extremes.back();
else else
extremes_port = uniteExtremes(extremes, current_header, processors); extremes_port = uniteExtremes(extremes, common_header, processors);
} }
if (!totals.empty()) if (!totals.empty())
@ -662,7 +662,7 @@ void QueryPipeline::unitePipelines(
if (totals.size() == 1) if (totals.size() == 1)
totals_having_port = totals.back(); totals_having_port = totals.back();
else else
totals_having_port = uniteTotals(totals, current_header, processors); totals_having_port = uniteTotals(totals, common_header, processors);
} }
current_header = common_header; current_header = common_header;

View File

@ -552,6 +552,30 @@ void IMergeTreeDataPart::loadRowsCount()
auto buf = openForReading(volume->getDisk(), path); auto buf = openForReading(volume->getDisk(), path);
readIntText(rows_count, *buf); readIntText(rows_count, *buf);
assertEOF(*buf); assertEOF(*buf);
#ifndef NDEBUG
/// columns have to be loaded
for (const auto & column : getColumns())
{
/// Most trivial types
if (column.type->isValueRepresentedByNumber() && !column.type->haveSubtypes())
{
auto size = getColumnSize(column.name, *column.type);
if (size.data_uncompressed == 0)
continue;
size_t rows_in_column = size.data_uncompressed / column.type->getSizeOfValueInMemory();
if (rows_in_column != rows_count)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Column {} has rows count {} according to size in memory "
"and size of single value, but data part {} has {} rows", backQuote(column.name), rows_in_column, name, rows_count);
}
}
}
#endif
} }
else else
{ {

View File

@ -2408,11 +2408,6 @@ static void loadPartAndFixMetadataImpl(MergeTreeData::MutableDataPartPtr part)
auto disk = part->volume->getDisk(); auto disk = part->volume->getDisk();
String full_part_path = part->getFullRelativePath(); String full_part_path = part->getFullRelativePath();
/// Earlier the list of columns was written incorrectly. Delete it and re-create.
/// But in compact parts we can't get list of columns without this file.
if (isWidePart(part))
disk->removeIfExists(full_part_path + "columns.txt");
part->loadColumnsChecksumsIndexes(false, true); part->loadColumnsChecksumsIndexes(false, true);
part->modification_time = disk->getLastModified(full_part_path).epochTime(); part->modification_time = disk->getLastModified(full_part_path).epochTime();
} }

View File

@ -30,13 +30,15 @@
#include <Common/interpolate.h> #include <Common/interpolate.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <cmath>
#include <numeric>
#include <iomanip>
#include <cmath>
#include <ctime>
#include <iomanip>
#include <numeric>
#include <boost/algorithm/string/replace.hpp> #include <boost/algorithm/string/replace.hpp>
namespace ProfileEvents namespace ProfileEvents
{ {
extern const Event MergedRows; extern const Event MergedRows;
@ -219,14 +221,13 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
return false; return false;
} }
time_t current_time = time(nullptr); time_t current_time = std::time(nullptr);
IMergeSelector::Partitions partitions; IMergeSelector::Partitions partitions;
const String * prev_partition_id = nullptr; const String * prev_partition_id = nullptr;
/// Previous part only in boundaries of partition frame /// Previous part only in boundaries of partition frame
const MergeTreeData::DataPartPtr * prev_part = nullptr; const MergeTreeData::DataPartPtr * prev_part = nullptr;
bool has_part_with_expired_ttl = false;
for (const MergeTreeData::DataPartPtr & part : data_parts) for (const MergeTreeData::DataPartPtr & part : data_parts)
{ {
/// Check predicate only for first part in each partition. /// Check predicate only for first part in each partition.
@ -258,11 +259,6 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
part_info.min_ttl = part->ttl_infos.part_min_ttl; part_info.min_ttl = part->ttl_infos.part_min_ttl;
part_info.max_ttl = part->ttl_infos.part_max_ttl; part_info.max_ttl = part->ttl_infos.part_max_ttl;
time_t ttl = data_settings->ttl_only_drop_parts ? part_info.max_ttl : part_info.min_ttl;
if (ttl && ttl <= current_time)
has_part_with_expired_ttl = true;
partitions.back().emplace_back(part_info); partitions.back().emplace_back(part_info);
/// Check for consistency of data parts. If assertion is failed, it requires immediate investigation. /// Check for consistency of data parts. If assertion is failed, it requires immediate investigation.
@ -275,38 +271,38 @@ bool MergeTreeDataMergerMutator::selectPartsToMerge(
prev_part = &part; prev_part = &part;
} }
std::unique_ptr<IMergeSelector> merge_selector; IMergeSelector::PartsInPartition parts_to_merge;
SimpleMergeSelector::Settings merge_settings; if (!ttl_merges_blocker.isCancelled())
if (aggressive)
merge_settings.base = 1;
bool can_merge_with_ttl =
(current_time - last_merge_with_ttl > data_settings->merge_with_ttl_timeout);
/// NOTE Could allow selection of different merge strategy.
if (can_merge_with_ttl && has_part_with_expired_ttl && !ttl_merges_blocker.isCancelled())
{ {
merge_selector = std::make_unique<TTLMergeSelector>(current_time, data_settings->ttl_only_drop_parts); TTLMergeSelector merge_selector(
last_merge_with_ttl = current_time; next_ttl_merge_times_by_partition,
current_time,
data_settings->merge_with_ttl_timeout,
data_settings->ttl_only_drop_parts);
parts_to_merge = merge_selector.select(partitions, max_total_size_to_merge);
} }
else
merge_selector = std::make_unique<SimpleMergeSelector>(merge_settings);
IMergeSelector::PartsInPartition parts_to_merge = merge_selector->select(
partitions,
max_total_size_to_merge);
if (parts_to_merge.empty()) if (parts_to_merge.empty())
{ {
if (out_disable_reason) SimpleMergeSelector::Settings merge_settings;
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm"; if (aggressive)
return false; merge_settings.base = 1;
}
/// Allow to "merge" part with itself if we need remove some values with expired ttl parts_to_merge = SimpleMergeSelector(merge_settings)
if (parts_to_merge.size() == 1 && !has_part_with_expired_ttl) .select(partitions, max_total_size_to_merge);
throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR);
/// Do not allow to "merge" part with itself for regular merges, unless it is a TTL-merge where it is ok to remove some values with expired ttl
if (parts_to_merge.size() == 1)
throw Exception("Logical error: merge selector returned only one part to merge", ErrorCodes::LOGICAL_ERROR);
if (parts_to_merge.empty())
{
if (out_disable_reason)
*out_disable_reason = "There is no need to merge parts according to merge selector algorithm";
return false;
}
}
MergeTreeData::DataPartsVector parts; MergeTreeData::DataPartsVector parts;
parts.reserve(parts_to_merge.size()); parts.reserve(parts_to_merge.size());
@ -1092,7 +1088,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
need_remove_expired_values = true; need_remove_expired_values = true;
/// All columns from part are changed and may be some more that were missing before in part /// All columns from part are changed and may be some more that were missing before in part
if (!isWidePart(source_part) || source_part->getColumns().isSubsetOf(updated_header.getNamesAndTypesList())) if (!isWidePart(source_part) || (interpreter && interpreter->isAffectingAllColumns()))
{ {
auto part_indices = getIndicesForNewDataPart(metadata_snapshot->getSecondaryIndices(), for_file_renames); auto part_indices = getIndicesForNewDataPart(metadata_snapshot->getSecondaryIndices(), for_file_renames);
mutateAllPartColumns( mutateAllPartColumns(
@ -1478,13 +1474,14 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
return updated_header.getNamesAndTypesList(); return updated_header.getNamesAndTypesList();
NameSet removed_columns; NameSet removed_columns;
NameToNameMap renamed_columns; NameToNameMap renamed_columns_to_from;
/// All commands are validated in AlterCommand so we don't care about order
for (const auto & command : commands_for_removes) for (const auto & command : commands_for_removes)
{ {
if (command.type == MutationCommand::DROP_COLUMN) if (command.type == MutationCommand::DROP_COLUMN)
removed_columns.insert(command.column_name); removed_columns.insert(command.column_name);
if (command.type == MutationCommand::RENAME_COLUMN) if (command.type == MutationCommand::RENAME_COLUMN)
renamed_columns.emplace(command.rename_to, command.column_name); renamed_columns_to_from.emplace(command.rename_to, command.column_name);
} }
Names source_column_names = source_part->getColumns().getNames(); Names source_column_names = source_part->getColumns().getNames();
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end()); NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
@ -1497,17 +1494,49 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
it->type = updated_type; it->type = updated_type;
++it; ++it;
} }
else if (source_columns_name_set.count(it->name) && !removed_columns.count(it->name))
{
++it;
}
else if (renamed_columns.count(it->name) && source_columns_name_set.count(renamed_columns[it->name]))
{
++it;
}
else else
{ {
it = storage_columns.erase(it); if (!source_columns_name_set.count(it->name))
{
/// Source part doesn't have column but some other column
/// was renamed to it's name.
auto renamed_it = renamed_columns_to_from.find(it->name);
if (renamed_it != renamed_columns_to_from.end()
&& source_columns_name_set.count(renamed_it->second))
++it;
else
it = storage_columns.erase(it);
}
else
{
bool was_renamed = false;
bool was_removed = removed_columns.count(it->name);
/// Check that this column was renamed to some other name
for (const auto & [rename_to, rename_from] : renamed_columns_to_from)
{
if (rename_from == it->name)
{
was_renamed = true;
break;
}
}
/// If we want to rename this column to some other name, than it
/// should it's previous version should be dropped or removed
if (renamed_columns_to_from.count(it->name) && !was_renamed && !was_removed)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Incorrect mutation commands, trying to rename column {} to {}, but part {} already has column {}", renamed_columns_to_from[it->name], it->name, source_part->name, it->name);
/// Column was renamed and no other column renamed to it's name
/// or column is dropped.
if (!renamed_columns_to_from.count(it->name) && (was_renamed || was_removed))
it = storage_columns.erase(it);
else
++it;
}
} }
} }

View File

@ -5,6 +5,7 @@
#include <atomic> #include <atomic>
#include <functional> #include <functional>
#include <Common/ActionBlocker.h> #include <Common/ActionBlocker.h>
#include <Storages/MergeTree/TTLMergeSelector.h>
namespace DB namespace DB
@ -242,8 +243,10 @@ private:
/// When the last time you wrote to the log that the disk space was running out (not to write about this too often). /// When the last time you wrote to the log that the disk space was running out (not to write about this too often).
time_t disk_space_warning_time = 0; time_t disk_space_warning_time = 0;
/// Last time when TTLMergeSelector has been used /// Stores the next TTL merge due time for each partition (used only by TTLMergeSelector)
time_t last_merge_with_ttl = 0; TTLMergeSelector::PartitionIdToTTLs next_ttl_merge_times_by_partition;
/// Performing TTL merges independently for each partition guarantees that
/// there is only a limited number of TTL merges and no partition stores data, that is too stale
}; };

View File

@ -13,6 +13,7 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA; extern const int CANNOT_READ_ALL_DATA;
extern const int NO_FILE_IN_DATA_PART; extern const int NO_FILE_IN_DATA_PART;
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART; extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int LOGICAL_ERROR;
} }
@ -237,6 +238,21 @@ void MergeTreeDataPartWide::calculateEachColumnSizes(ColumnSizeByName & each_col
ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams); ColumnSize size = getColumnSizeImpl(column.name, *column.type, &processed_substreams);
each_columns_size[column.name] = size; each_columns_size[column.name] = size;
total_size.add(size); total_size.add(size);
#ifndef NDEBUG
/// Most trivial types
if (rows_count != 0 && column.type->isValueRepresentedByNumber() && !column.type->haveSubtypes())
{
size_t rows_in_column = size.data_uncompressed / column.type->getSizeOfValueInMemory();
if (rows_in_column != rows_count)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Column {} has rows count {} according to size in memory "
"and size of single value, but data part {} has {} rows", backQuote(column.name), rows_in_column, name, rows_count);
}
}
#endif
} }
} }

View File

@ -1552,6 +1552,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
part->index_granularity_info.index_granularity_bytes); part->index_granularity_info.index_granularity_bytes);
size_t granules_dropped = 0; size_t granules_dropped = 0;
size_t total_granules = 0;
size_t marks_count = part->getMarksCount(); size_t marks_count = part->getMarksCount();
size_t final_mark = part->index_granularity.hasFinalMark(); size_t final_mark = part->index_granularity.hasFinalMark();
@ -1578,6 +1579,8 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
if (last_index_mark != index_range.begin || !granule) if (last_index_mark != index_range.begin || !granule)
reader.seek(index_range.begin); reader.seek(index_range.begin);
total_granules += index_range.end - index_range.begin;
for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark) for (size_t index_mark = index_range.begin; index_mark < index_range.end; ++index_mark)
{ {
if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin) if (index_mark != index_range.begin || !granule || last_index_mark != index_range.begin)
@ -1602,7 +1605,7 @@ MarkRanges MergeTreeDataSelectExecutor::filterMarksUsingIndex(
last_index_mark = index_range.end - 1; last_index_mark = index_range.end - 1;
} }
LOG_DEBUG(log, "Index {} has dropped {} granules.", backQuote(index_helper->index.name), granules_dropped); LOG_DEBUG(log, "Index {} has dropped {} / {} granules.", backQuote(index_helper->index.name), granules_dropped, total_granules);
return res; return res;
} }

View File

@ -1,12 +1,20 @@
#include <Storages/MergeTree/TTLMergeSelector.h> #include <Storages/MergeTree/TTLMergeSelector.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <cmath>
#include <algorithm> #include <algorithm>
#include <cmath>
namespace DB namespace DB
{ {
const String & getPartitionIdForPart(const TTLMergeSelector::Part & part_info)
{
const MergeTreeData::DataPartPtr & part = *static_cast<const MergeTreeData::DataPartPtr *>(part_info.data);
return part->info.partition_id;
}
IMergeSelector::PartsInPartition TTLMergeSelector::select( IMergeSelector::PartsInPartition TTLMergeSelector::select(
const Partitions & partitions, const Partitions & partitions,
const size_t max_total_size_to_merge) const size_t max_total_size_to_merge)
@ -18,15 +26,24 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select(
for (size_t i = 0; i < partitions.size(); ++i) for (size_t i = 0; i < partitions.size(); ++i)
{ {
for (auto it = partitions[i].begin(); it != partitions[i].end(); ++it) const auto & mergeable_parts_in_partition = partitions[i];
if (mergeable_parts_in_partition.empty())
continue;
const auto & partition_id = getPartitionIdForPart(mergeable_parts_in_partition.front());
const auto & next_merge_time_for_partition = merge_due_times[partition_id];
if (next_merge_time_for_partition > current_time)
continue;
for (Iterator part_it = mergeable_parts_in_partition.cbegin(); part_it != mergeable_parts_in_partition.cend(); ++part_it)
{ {
time_t ttl = only_drop_parts ? it->max_ttl : it->min_ttl; time_t ttl = only_drop_parts ? part_it->max_ttl : part_it->min_ttl;
if (ttl && (partition_to_merge_index == -1 || ttl < partition_to_merge_min_ttl)) if (ttl && (partition_to_merge_index == -1 || ttl < partition_to_merge_min_ttl))
{ {
partition_to_merge_min_ttl = ttl; partition_to_merge_min_ttl = ttl;
partition_to_merge_index = i; partition_to_merge_index = i;
best_begin = it; best_begin = part_it;
} }
} }
} }
@ -68,6 +85,9 @@ IMergeSelector::PartsInPartition TTLMergeSelector::select(
++best_end; ++best_end;
} }
const auto & best_partition_id = getPartitionIdForPart(best_partition.front());
merge_due_times[best_partition_id] = current_time + merge_cooldown_time;
return PartsInPartition(best_begin, best_end); return PartsInPartition(best_begin, best_end);
} }

View File

@ -1,7 +1,10 @@
#pragma once #pragma once
#include <Core/Types.h>
#include <Storages/MergeTree/MergeSelector.h> #include <Storages/MergeTree/MergeSelector.h>
#include <map>
namespace DB namespace DB
{ {
@ -10,17 +13,27 @@ namespace DB
* It selects parts to merge by greedy algorithm: * It selects parts to merge by greedy algorithm:
* 1. Finds part with the most earliest expired ttl and includes it to result. * 1. Finds part with the most earliest expired ttl and includes it to result.
* 2. Tries to find the longest range of parts with expired ttl, that includes part from step 1. * 2. Tries to find the longest range of parts with expired ttl, that includes part from step 1.
* Finally, merge selector updates TTL merge timer for the selected partition
*/ */
class TTLMergeSelector : public IMergeSelector class TTLMergeSelector : public IMergeSelector
{ {
public: public:
explicit TTLMergeSelector(time_t current_time_, bool only_drop_parts_) : current_time(current_time_), only_drop_parts(only_drop_parts_) {} using PartitionIdToTTLs = std::map<String, time_t>;
explicit TTLMergeSelector(PartitionIdToTTLs & merge_due_times_, time_t current_time_, Int64 merge_cooldown_time_, bool only_drop_parts_)
: merge_due_times(merge_due_times_),
current_time(current_time_),
merge_cooldown_time(merge_cooldown_time_),
only_drop_parts(only_drop_parts_) {}
PartsInPartition select( PartsInPartition select(
const Partitions & partitions, const Partitions & partitions,
const size_t max_total_size_to_merge) override; const size_t max_total_size_to_merge) override;
private: private:
PartitionIdToTTLs & merge_due_times;
time_t current_time; time_t current_time;
Int64 merge_cooldown_time;
bool only_drop_parts; bool only_drop_parts;
}; };

View File

@ -155,8 +155,6 @@ class _NetworkManager:
def __init__( def __init__(
self, self,
image_name='clickhouse_tests_helper',
image_path=p.join(CLICKHOUSE_ROOT_DIR, 'docker', 'test', 'integration', 'helper_container'),
container_expire_timeout=50, container_exit_timeout=60): container_expire_timeout=50, container_exit_timeout=60):
self.container_expire_timeout = container_expire_timeout self.container_expire_timeout = container_expire_timeout
@ -164,14 +162,6 @@ class _NetworkManager:
self._docker_client = docker.from_env(version=os.environ.get("DOCKER_API_VERSION")) self._docker_client = docker.from_env(version=os.environ.get("DOCKER_API_VERSION"))
try:
self._image = self._docker_client.images.get(image_name)
except docker.errors.ImageNotFound:
# Use docker console client instead of python API to work around https://github.com/docker/docker-py/issues/1397
subprocess.check_call(
['docker', 'build', '--force-rm', '--tag', image_name, '--network', 'host', image_path])
self._image = self._docker_client.images.get(image_name)
self._container = None self._container = None
self._ensure_container() self._ensure_container()
@ -185,15 +175,11 @@ class _NetworkManager:
except docker.errors.NotFound: except docker.errors.NotFound:
pass pass
# Work around https://github.com/docker/docker-py/issues/1477 self._container = self._docker_client.containers.run('yandex/clickhouse-integration-helper', auto_remove=True,
host_config = self._docker_client.api.create_host_config(network_mode='host', auto_remove=True) command=('sleep %s' % self.container_exit_timeout),
container_id = self._docker_client.api.create_container( detach=True, network_mode='host')
self._image.id, command=('sleep %s' % self.container_exit_timeout), container_id = self._container.id
detach=True, host_config=host_config)['Id']
self._container_expire_time = time.time() + self.container_expire_timeout self._container_expire_time = time.time() + self.container_expire_timeout
self._docker_client.api.start(container_id)
self._container = self._docker_client.containers.get(container_id)
return self._container return self._container

View File

@ -0,0 +1,152 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True, image='yandex/clickhouse-server:19.4.5.35', stay_alive=True, with_installed_binary=True)
node2 = cluster.add_instance('node2', with_zookeeper=True, image='yandex/clickhouse-server:19.4.5.35', stay_alive=True, with_installed_binary=True)
node3 = cluster.add_instance('node3', with_zookeeper=True, image='yandex/clickhouse-server:19.4.5.35', stay_alive=True, with_installed_binary=True)
node4 = cluster.add_instance('node4')
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_backup_from_old_version(started_cluster):
node1.query("CREATE TABLE source_table(A Int64, B String) Engine = MergeTree order by tuple()")
node1.query("INSERT INTO source_table VALUES(1, '1')")
assert node1.query("SELECT COUNT() FROM source_table") == "1\n"
node1.query("ALTER TABLE source_table ADD COLUMN Y String")
node1.query("ALTER TABLE source_table FREEZE PARTITION tuple();")
node1.restart_with_latest_version()
node1.query("CREATE TABLE dest_table (A Int64, B String, Y String) ENGINE = ReplicatedMergeTree('/test/dest_table1', '1') ORDER BY tuple()")
node1.query("INSERT INTO dest_table VALUES(2, '2', 'Hello')")
assert node1.query("SELECT COUNT() FROM dest_table") == "1\n"
node1.exec_in_container(['bash', '-c', 'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached'])
assert node1.query("SELECT COUNT() FROM dest_table") == "1\n"
node1.query("ALTER TABLE dest_table ATTACH PARTITION tuple()")
assert node1.query("SELECT sum(A) FROM dest_table") == "3\n"
node1.query("ALTER TABLE dest_table DETACH PARTITION tuple()")
node1.query("ALTER TABLE dest_table ATTACH PARTITION tuple()")
assert node1.query("SELECT sum(A) FROM dest_table") == "3\n"
assert node1.query("CHECK TABLE dest_table") == "1\n"
def test_backup_from_old_version_setting(started_cluster):
node2.query("CREATE TABLE source_table(A Int64, B String) Engine = MergeTree order by tuple()")
node2.query("INSERT INTO source_table VALUES(1, '1')")
assert node2.query("SELECT COUNT() FROM source_table") == "1\n"
node2.query("ALTER TABLE source_table ADD COLUMN Y String")
node2.query("ALTER TABLE source_table FREEZE PARTITION tuple();")
node2.restart_with_latest_version()
node2.query("CREATE TABLE dest_table (A Int64, B String, Y String) ENGINE = ReplicatedMergeTree('/test/dest_table2', '1') ORDER BY tuple() SETTINGS enable_mixed_granularity_parts = 1")
node2.query("INSERT INTO dest_table VALUES(2, '2', 'Hello')")
assert node2.query("SELECT COUNT() FROM dest_table") == "1\n"
node2.exec_in_container(['bash', '-c', 'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached'])
assert node2.query("SELECT COUNT() FROM dest_table") == "1\n"
node2.query("ALTER TABLE dest_table ATTACH PARTITION tuple()")
assert node2.query("SELECT sum(A) FROM dest_table") == "3\n"
node2.query("ALTER TABLE dest_table DETACH PARTITION tuple()")
node2.query("ALTER TABLE dest_table ATTACH PARTITION tuple()")
assert node2.query("SELECT sum(A) FROM dest_table") == "3\n"
assert node1.query("CHECK TABLE dest_table") == "1\n"
def test_backup_from_old_version_config(started_cluster):
node3.query("CREATE TABLE source_table(A Int64, B String) Engine = MergeTree order by tuple()")
node3.query("INSERT INTO source_table VALUES(1, '1')")
assert node3.query("SELECT COUNT() FROM source_table") == "1\n"
node3.query("ALTER TABLE source_table ADD COLUMN Y String")
node3.query("ALTER TABLE source_table FREEZE PARTITION tuple();")
def callback(n):
n.replace_config("/etc/clickhouse-server/merge_tree_settings.xml", "<yandex><merge_tree><enable_mixed_granularity_parts>1</enable_mixed_granularity_parts></merge_tree></yandex>")
node3.restart_with_latest_version(callback_onstop=callback)
node3.query("CREATE TABLE dest_table (A Int64, B String, Y String) ENGINE = ReplicatedMergeTree('/test/dest_table3', '1') ORDER BY tuple() SETTINGS enable_mixed_granularity_parts = 1")
node3.query("INSERT INTO dest_table VALUES(2, '2', 'Hello')")
assert node3.query("SELECT COUNT() FROM dest_table") == "1\n"
node3.exec_in_container(['bash', '-c', 'cp -r /var/lib/clickhouse/shadow/1/data/default/source_table/all_1_1_0/ /var/lib/clickhouse/data/default/dest_table/detached'])
assert node3.query("SELECT COUNT() FROM dest_table") == "1\n"
node3.query("ALTER TABLE dest_table ATTACH PARTITION tuple()")
assert node3.query("SELECT sum(A) FROM dest_table") == "3\n"
node3.query("ALTER TABLE dest_table DETACH PARTITION tuple()")
node3.query("ALTER TABLE dest_table ATTACH PARTITION tuple()")
assert node3.query("SELECT sum(A) FROM dest_table") == "3\n"
assert node1.query("CHECK TABLE dest_table") == "1\n"
def test_backup_and_alter(started_cluster):
node4.query("CREATE TABLE backup_table(A Int64, B String, C Date) Engine = MergeTree order by tuple()")
node4.query("INSERT INTO backup_table VALUES(2, '2', toDate('2019-10-01'))")
node4.query("ALTER TABLE backup_table FREEZE PARTITION tuple();")
node4.query("ALTER TABLE backup_table DROP COLUMN C")
node4.query("ALTER TABLE backup_table MODIFY COLUMN B UInt64")
node4.query("ALTER TABLE backup_table DROP PARTITION tuple()")
node4.exec_in_container(['bash', '-c', 'cp -r /var/lib/clickhouse/shadow/1/data/default/backup_table/all_1_1_0/ /var/lib/clickhouse/data/default/backup_table/detached'])
node4.query("ALTER TABLE backup_table ATTACH PARTITION tuple()")
assert node4.query("SELECT sum(A) FROM backup_table") == "2\n"
assert node4.query("SELECT B + 2 FROM backup_table") == "4\n"

View File

@ -0,0 +1,133 @@
import pytest
import os
import time
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import assert_eq_with_retry
from multiprocessing.dummy import Pool
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1')
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def check_hardlinks(table, part_path, column_file, count):
column_path = os.path.join("/var/lib/clickhouse/data/default", table, part_path, column_file)
script = """
export INODE=`ls -i {column_path} | awk '{{print $1}}'`
export COUNT=`find /var/lib/clickhouse -inum $INODE | wc -l`
test $COUNT = {count}
""".format(column_path=column_path, count=count)
node1.exec_in_container(["bash", "-c", script])
def check_exists(table, part_path, column_file):
column_path = os.path.join("/var/lib/clickhouse/data/default", table, part_path, column_file)
node1.exec_in_container(["bash", "-c", "test -f {}".format(column_path)])
def test_update_mutation(started_cluster):
node1.query("CREATE TABLE table_for_update(key UInt64, value1 UInt64, value2 String) ENGINE MergeTree() ORDER BY tuple()")
node1.query("INSERT INTO table_for_update SELECT number, number, toString(number) from numbers(100)")
assert int(node1.query("SELECT sum(value1) FROM table_for_update").strip()) == sum(range(100))
node1.query("ALTER TABLE table_for_update UPDATE value1 = value1 * value1 WHERE 1", settings={"mutations_sync" : "2"})
assert int(node1.query("SELECT sum(value1) FROM table_for_update").strip()) == sum(i * i for i in range(100))
check_hardlinks("table_for_update", "all_1_1_0_2", "key.bin", 2)
check_hardlinks("table_for_update", "all_1_1_0_2", "value2.bin", 2)
check_hardlinks("table_for_update", "all_1_1_0_2", "value1.bin", 1)
node1.query("ALTER TABLE table_for_update UPDATE key=key, value1=value1, value2=value2 WHERE 1", settings={"mutations_sync": "2"})
assert int(node1.query("SELECT sum(value1) FROM table_for_update").strip()) == sum(i * i for i in range(100))
check_hardlinks("table_for_update", "all_1_1_0_3", "key.bin", 1)
check_hardlinks("table_for_update", "all_1_1_0_3", "value1.bin", 1)
check_hardlinks("table_for_update", "all_1_1_0_3", "value2.bin", 1)
def test_modify_mutation(started_cluster):
node1.query("CREATE TABLE table_for_modify(key UInt64, value1 UInt64, value2 String) ENGINE MergeTree() ORDER BY tuple()")
node1.query("INSERT INTO table_for_modify SELECT number, number, toString(number) from numbers(100)")
assert int(node1.query("SELECT sum(value1) FROM table_for_modify").strip()) == sum(range(100))
node1.query("ALTER TABLE table_for_modify MODIFY COLUMN value2 UInt64", settings={"mutations_sync" : "2"})
assert int(node1.query("SELECT sum(value2) FROM table_for_modify").strip()) == sum(range(100))
check_hardlinks("table_for_modify", "all_1_1_0_2", "key.bin", 2)
check_hardlinks("table_for_modify", "all_1_1_0_2", "value1.bin", 2)
check_hardlinks("table_for_modify", "all_1_1_0_2", "value2.bin", 1)
def test_drop_mutation(started_cluster):
node1.query("CREATE TABLE table_for_drop(key UInt64, value1 UInt64, value2 String) ENGINE MergeTree() ORDER BY tuple()")
node1.query("INSERT INTO table_for_drop SELECT number, number, toString(number) from numbers(100)")
assert int(node1.query("SELECT sum(value1) FROM table_for_drop").strip()) == sum(range(100))
node1.query("ALTER TABLE table_for_drop DROP COLUMN value2", settings={"mutations_sync": "2"})
check_hardlinks("table_for_drop", "all_1_1_0_2", "key.bin", 2)
check_hardlinks("table_for_drop", "all_1_1_0_2", "value1.bin", 2)
with pytest.raises(Exception):
check_exists("table_for_drop", "all_1_1_0_2", "value2.bin")
with pytest.raises(Exception):
check_exists("table_for_drop", "all_1_1_0_2", "value2.mrk")
def test_delete_and_drop_mutation(started_cluster):
node1.query("CREATE TABLE table_for_delete_and_drop(key UInt64, value1 UInt64, value2 String) ENGINE MergeTree() ORDER BY tuple()")
node1.query("INSERT INTO table_for_delete_and_drop SELECT number, number, toString(number) from numbers(100)")
assert int(node1.query("SELECT sum(value1) FROM table_for_delete_and_drop").strip()) == sum(range(100))
node1.query("SYSTEM STOP MERGES")
def mutate():
node1.query("ALTER TABLE table_for_delete_and_drop DELETE WHERE key % 2 == 0, DROP COLUMN value2")
p = Pool(2)
p.apply_async(mutate)
for _ in range(1, 100):
result = node1.query("SELECT COUNT() FROM system.mutations WHERE table = 'table_for_delete_and_drop' and is_done=0")
try:
if int(result.strip()) == 2:
break
except:
print "Result", result
pass
time.sleep(0.5)
node1.query("SYSTEM START MERGES")
assert_eq_with_retry(node1, "SELECT COUNT() FROM table_for_delete_and_drop", str(sum(1 for i in range(100) if i % 2 != 0)))
check_hardlinks("table_for_delete_and_drop", "all_1_1_0_3", "key.bin", 1)
check_hardlinks("table_for_delete_and_drop", "all_1_1_0_3", "value1.bin", 1)
with pytest.raises(Exception):
check_exists("table_for_delete_and_drop", "all_1_1_0_3", "value2.bin")
with pytest.raises(Exception):
check_exists("table_for_delete_and_drop", "all_1_1_0_3", "value2.mrk")

View File

@ -5,10 +5,12 @@ import helpers.client as client
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV from helpers.test_tools import TSV
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_zookeeper=True) node1 = cluster.add_instance('node1', with_zookeeper=True)
node2 = cluster.add_instance('node2', with_zookeeper=True) node2 = cluster.add_instance('node2', with_zookeeper=True)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def started_cluster(): def started_cluster():
try: try:
@ -22,11 +24,13 @@ def started_cluster():
finally: finally:
cluster.shutdown() cluster.shutdown()
def drop_table(nodes, table_name): def drop_table(nodes, table_name):
for node in nodes: for node in nodes:
node.query("DROP TABLE IF EXISTS {} NO DELAY".format(table_name)) node.query("DROP TABLE IF EXISTS {} NO DELAY".format(table_name))
time.sleep(1) time.sleep(1)
def test_ttl_columns(started_cluster): def test_ttl_columns(started_cluster):
drop_table([node1, node2], "test_ttl") drop_table([node1, node2], "test_ttl")
for node in [node1, node2]: for node in [node1, node2]:
@ -47,6 +51,40 @@ def test_ttl_columns(started_cluster):
assert TSV(node2.query("SELECT id, a, b FROM test_ttl ORDER BY id")) == TSV(expected) assert TSV(node2.query("SELECT id, a, b FROM test_ttl ORDER BY id")) == TSV(expected)
def test_merge_with_ttl_timeout(started_cluster):
table = "test_merge_with_ttl_timeout"
drop_table([node1, node2], table)
for node in [node1, node2]:
node.query(
'''
CREATE TABLE {table}(date DateTime, id UInt32, a Int32 TTL date + INTERVAL 1 DAY, b Int32 TTL date + INTERVAL 1 MONTH)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/test/{table}', '{replica}')
ORDER BY id PARTITION BY toDayOfMonth(date);
'''.format(replica=node.name, table=table))
node1.query("SYSTEM STOP TTL MERGES {table}".format(table=table))
node2.query("SYSTEM STOP TTL MERGES {table}".format(table=table))
for i in range(1, 4):
node1.query("INSERT INTO {table} VALUES (toDateTime('2000-10-{day:02d} 10:00:00'), 1, 2, 3)".format(day=i, table=table))
assert node1.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "0\n"
assert node2.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "0\n"
node1.query("SYSTEM START TTL MERGES {table}".format(table=table))
node2.query("SYSTEM START TTL MERGES {table}".format(table=table))
time.sleep(15) # TTL merges shall happen.
for i in range(1, 4):
node1.query("INSERT INTO {table} VALUES (toDateTime('2000-10-{day:02d} 10:00:00'), 1, 2, 3)".format(day=i, table=table))
time.sleep(15) # TTL merges shall not happen.
assert node1.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "3\n"
assert node2.query("SELECT countIf(a = 0) FROM {table}".format(table=table)) == "3\n"
def test_ttl_many_columns(started_cluster): def test_ttl_many_columns(started_cluster):
drop_table([node1, node2], "test_ttl_2") drop_table([node1, node2], "test_ttl_2")
for node in [node1, node2]: for node in [node1, node2]:

View File

@ -81,6 +81,15 @@ CREATE USER u6_01292 DEFAULT ROLE NONE
-- complex -- complex
CREATE USER u1_01292 IDENTIFIED WITH plaintext_password HOST LOCAL SETTINGS readonly = 1 CREATE USER u1_01292 IDENTIFIED WITH plaintext_password HOST LOCAL SETTINGS readonly = 1
CREATE USER u1_01292 HOST LIKE \'%.%.myhost.com\' DEFAULT ROLE NONE SETTINGS PROFILE default CREATE USER u1_01292 HOST LIKE \'%.%.myhost.com\' DEFAULT ROLE NONE SETTINGS PROFILE default
-- if not exists
CREATE USER u1_01292
GRANT r1_01292 TO u1_01292
-- if not exists-part2
CREATE USER u1_01292
GRANT r1_01292, r2_01292 TO u1_01292
-- or replace
CREATE USER u1_01292
CREATE USER u2_01292
-- multiple users in one command -- multiple users in one command
CREATE USER u1_01292 DEFAULT ROLE NONE CREATE USER u1_01292 DEFAULT ROLE NONE
CREATE USER u2_01292 DEFAULT ROLE NONE CREATE USER u2_01292 DEFAULT ROLE NONE

View File

@ -177,6 +177,24 @@ ALTER USER u1_01292 NOT IDENTIFIED HOST LIKE '%.%.myhost.com' DEFAULT ROLE NONE
SHOW CREATE USER u1_01292; SHOW CREATE USER u1_01292;
DROP USER u1_01292; DROP USER u1_01292;
SELECT '-- if not exists';
CREATE USER u1_01292;
GRANT r1_01292 TO u1_01292;
SHOW CREATE USER u1_01292;
SHOW GRANTS FOR u1_01292;
SELECT '-- if not exists-part2';
CREATE USER IF NOT EXISTS u1_01292;
GRANT r2_01292 TO u1_01292;
SHOW CREATE USER u1_01292;
SHOW GRANTS FOR u1_01292;
SELECT '-- or replace';
CREATE USER OR REPLACE u1_01292;
SHOW CREATE USER u1_01292;
SHOW GRANTS FOR u1_01292;
CREATE USER IF NOT EXISTS u2_01292;
SHOW CREATE USER u2_01292;
DROP USER u1_01292, u2_01292;
SELECT '-- multiple users in one command'; SELECT '-- multiple users in one command';
CREATE USER u1_01292, u2_01292 DEFAULT ROLE NONE; CREATE USER u1_01292, u2_01292 DEFAULT ROLE NONE;
CREATE USER u3_01292, u4_01292 HOST LIKE '%.%.myhost.com'; CREATE USER u3_01292, u4_01292 HOST LIKE '%.%.myhost.com';

View File

@ -0,0 +1,14 @@
1
CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value1` UInt64,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192
1
CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value1` UInt64,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192
1
CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value1` String,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192
1
CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value1` String,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192
1
CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192
1
CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `renamed_value1` String,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nSETTINGS index_granularity = 8192
1
CREATE TABLE default.sticking_mutations\n(\n `date` Date,\n `key` UInt64,\n `value1` UInt64,\n `value2` UInt8\n)\nENGINE = MergeTree()\nORDER BY key\nTTL date + toIntervalDay(1)\nSETTINGS index_granularity = 8192

View File

@ -0,0 +1,70 @@
#!/usr/bin/env bash
set -e
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS sticking_mutations"
function check_sticky_mutations()
{
$CLICKHOUSE_CLIENT -n --query "CREATE TABLE sticking_mutations (
date Date,
key UInt64,
value1 String,
value2 UInt8
)
ENGINE = MergeTree()
ORDER BY key;"
$CLICKHOUSE_CLIENT --query "INSERT INTO sticking_mutations SELECT toDate('2020-07-10'), number, toString(number), number % 128 FROM numbers(1000)"
$CLICKHOUSE_CLIENT --query "INSERT INTO sticking_mutations SELECT toDate('2100-01-10'), number, toString(number), number % 128 FROM numbers(1000)"
# if merges stopped for normal merge tree mutations will stick
$CLICKHOUSE_CLIENT --query "SYSTEM STOP MERGES sticking_mutations"
$CLICKHOUSE_CLIENT --query "$1" &
##### wait mutation to start #####
check_query="SELECT count() FROM system.mutations WHERE table='sticking_mutations' and database='$CLICKHOUSE_DATABASE' and is_done = 0"
query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1`
while [ "$query_result" == "0" ]
do
query_result=`$CLICKHOUSE_CLIENT --query="$check_query" 2>&1`
sleep 0.5
done
##### wait mutation to start #####
# Starting merges to execute sticked mutations
$CLICKHOUSE_CLIENT --query "SYSTEM START MERGES sticking_mutations"
# just to be sure, that previous mutations finished
$CLICKHOUSE_CLIENT --query "ALTER TABLE sticking_mutations DELETE WHERE value2 % 31 == 0 SETTINGS mutations_sync = 1"
$CLICKHOUSE_CLIENT --query "OPTIMIZE TABLE sticking_mutations FINAL"
$CLICKHOUSE_CLIENT --query "SELECT sum(cityHash64(*)) > 1 FROM sticking_mutations WHERE key > 10"
$CLICKHOUSE_CLIENT --query "SHOW CREATE TABLE sticking_mutations"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS sticking_mutations"
}
check_sticky_mutations "ALTER TABLE sticking_mutations DELETE WHERE value2 % 32 == 0, MODIFY COLUMN value1 UInt64"
check_sticky_mutations "ALTER TABLE sticking_mutations MODIFY COLUMN value1 UInt64, DELETE WHERE value2 % 32 == 0"
check_sticky_mutations "ALTER TABLE sticking_mutations UPDATE value1 = 15 WHERE key < 2000, DELETE WHERE value2 % 32 == 0"
check_sticky_mutations "ALTER TABLE sticking_mutations DELETE WHERE value2 % 32 == 0, UPDATE value1 = 15 WHERE key < 2000"
check_sticky_mutations "ALTER TABLE sticking_mutations DELETE WHERE value2 % 32 == 0, DROP COLUMN value1"
check_sticky_mutations "ALTER TABLE sticking_mutations DELETE WHERE value2 % 32 == 0, RENAME COLUMN value1 TO renamed_value1"
check_sticky_mutations "ALTER TABLE sticking_mutations MODIFY COLUMN value1 UInt64, MODIFY TTL date + INTERVAL 1 DAY"

View File

@ -0,0 +1,63 @@
DROP TABLE IF EXISTS tableCommon;
DROP TABLE IF EXISTS tableTrees;
DROP TABLE IF EXISTS tableFlowers;
CREATE TABLE tableCommon (`key` FixedString(15), `value` Nullable(Int8)) ENGINE = Log();
CREATE TABLE tableTrees (`key` FixedString(15), `name` Nullable(Int8), `name2` Nullable(Int8)) ENGINE = Log();
CREATE TABLE tableFlowers (`key` FixedString(15), `name` Nullable(Int8)) ENGINE = Log();
SELECT * FROM (
SELECT common.key, common.value, trees.name, trees.name2
FROM (
SELECT *
FROM tableCommon
) as common
INNER JOIN (
SELECT *
FROM tableTrees
) trees ON (common.key = trees.key)
)
UNION ALL
(
SELECT common.key, common.value,
null as name, null as name2
FROM (
SELECT *
FROM tableCommon
) as common
INNER JOIN (
SELECT *
FROM tableFlowers
) flowers ON (common.key = flowers.key)
);
SELECT * FROM (
SELECT common.key, common.value, trees.name, trees.name2
FROM (
SELECT *
FROM tableCommon
) as common
INNER JOIN (
SELECT *
FROM tableTrees
) trees ON (common.key = trees.key)
)
UNION ALL
(
SELECT common.key, common.value,
flowers.name, null as name2
FROM (
SELECT *
FROM tableCommon
) as common
INNER JOIN (
SELECT *
FROM tableFlowers
) flowers ON (common.key = flowers.key)
);
DROP TABLE IF EXISTS tableCommon;
DROP TABLE IF EXISTS tableTrees;
DROP TABLE IF EXISTS tableFlowers;

View File

@ -0,0 +1 @@
(1,1,0)

View File

@ -0,0 +1 @@
select tuple(1, 1, number) as t from numbers_mt(1000001) order by t, number limit 1;

View File

@ -55,8 +55,7 @@
"01200_mutations_memory_consumption", "01200_mutations_memory_consumption",
"01103_check_cpu_instructions_at_startup", "01103_check_cpu_instructions_at_startup",
"01037_polygon_dicts_", "01037_polygon_dicts_",
"hyperscan", "hyperscan"
"00992_system_parts_race_condition_zookeeper"
], ],
"unbundled-build": [ "unbundled-build": [
"00429", "00429",

View File

@ -167,17 +167,20 @@ class Cluster(object):
self.docker_compose += f" --project-directory \"{docker_compose_project_dir}\" --file \"{docker_compose_file_path}\"" self.docker_compose += f" --project-directory \"{docker_compose_project_dir}\" --file \"{docker_compose_file_path}\""
self.lock = threading.Lock() self.lock = threading.Lock()
def shell(self, node): def shell(self, node, timeout=120):
"""Returns unique shell terminal to be used. """Returns unique shell terminal to be used.
""" """
if node is None: if node is None:
return Shell() return Shell()
return Shell(command=[ shell = Shell(command=[
"/bin/bash", "--noediting", "-c", f"{self.docker_compose} exec {node} bash --noediting" "/bin/bash", "--noediting", "-c", f"{self.docker_compose} exec {node} bash --noediting"
], name=node) ], name=node)
def bash(self, node, timeout=60): shell.timeout = timeout
return shell
def bash(self, node, timeout=120):
"""Returns thread-local bash terminal """Returns thread-local bash terminal
to a specific node. to a specific node.