more fixes

This commit is contained in:
Alexander Tokmakov 2021-10-05 00:13:18 +03:00
parent b702f7cbff
commit bb32432943
8 changed files with 55 additions and 50 deletions

View File

@ -75,9 +75,13 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MutateFromLogEntry
stopwatch_ptr = std::make_unique<Stopwatch>();
fake_query_context = Context::createCopy(storage.getContext());
fake_query_context->makeQueryContext();
fake_query_context->setCurrentQueryId("");
mutate_task = storage.merger_mutator.mutatePartToTemporaryPart(
future_mutated_part, metadata_snapshot, commands, merge_mutate_entry.get(),
entry.create_time, storage.getContext(), reserved_space, table_lock_holder);
entry.create_time, fake_query_context, reserved_space, table_lock_holder);
return {true, [this] (const ExecutionStatus & execution_status)
{

View File

@ -42,6 +42,7 @@ private:
MergeTreeData::MutableDataPartPtr new_part{nullptr};
FutureMergedMutatedPartPtr future_mutated_part{nullptr};
ContextMutablePtr fake_query_context;
MutateTaskPtr mutate_task;
};

View File

@ -43,9 +43,13 @@ void MutatePlainMergeTreeTask::prepare()
merge_list_entry.get());
};
fake_query_context = Context::createCopy(storage.getContext());
fake_query_context->makeQueryContext();
fake_query_context->setCurrentQueryId("");
mutate_task = storage.merger_mutator.mutatePartToTemporaryPart(
future_part, metadata_snapshot, merge_mutate_entry->commands, merge_list_entry.get(),
time(nullptr), storage.getContext(), merge_mutate_entry->tagger->reserved_space, table_lock_holder);
time(nullptr), fake_query_context, merge_mutate_entry->tagger->reserved_space, table_lock_holder);
}
bool MutatePlainMergeTreeTask::executeStep()

View File

@ -74,6 +74,7 @@ private:
IExecutableTask::TaskResultCallback task_result_callback;
ContextMutablePtr fake_query_context;
MutateTaskPtr mutate_task;
};

View File

@ -927,12 +927,16 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
{
try
{
auto fake_query_context = Context::createCopy(getContext());
fake_query_context->makeQueryContext();
fake_query_context->setCurrentQueryId("");
MutationsInterpreter interpreter(
shared_from_this(), metadata_snapshot, commands_for_size_validation, getContext(), false);
shared_from_this(), metadata_snapshot, commands_for_size_validation, fake_query_context, false);
commands_size += interpreter.evaluateCommandsSize();
}
catch (...)
{
tryLogCurrentException(log);
MergeTreeMutationEntry & entry = it->second;
entry.latest_fail_time = time(nullptr);
entry.latest_fail_reason = getCurrentExceptionMessage(false);
@ -965,49 +969,6 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMutate(
return {};
}
bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & merge_mutate_entry, TableLockHolder & table_lock_holder)
{
auto & future_part = merge_mutate_entry.future_part;
auto merge_list_entry = getContext()->getMergeList().insert(getStorageID(), future_part);
Stopwatch stopwatch;
MutableDataPartPtr new_part;
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
writePartLog(
PartLogElement::MUTATE_PART,
execution_status,
stopwatch.elapsed(),
future_part->name,
new_part,
future_part->parts,
merge_list_entry.get());
};
try
{
auto task = merger_mutator.mutatePartToTemporaryPart(
future_part, metadata_snapshot, merge_mutate_entry.commands, merge_list_entry.get(),
time(nullptr), getContext(), merge_mutate_entry.tagger->reserved_space, table_lock_holder);
new_part = executeHere(task);
renameTempPartAndReplace(new_part);
updateMutationEntriesErrors(future_part, true, "");
write_part_log({});
}
catch (...)
{
updateMutationEntriesErrors(future_part, false, getCurrentExceptionMessage(false));
write_part_log(ExecutionStatus::fromCurrentException());
throw;
}
return true;
}
bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assignee) //-V657
{
if (shutdown_called)

View File

@ -178,7 +178,6 @@ private:
std::shared_ptr<MergeMutateSelectedEntry> selectPartsToMutate(const StorageMetadataPtr & metadata_snapshot, String * disable_reason, TableLockHolder & table_lock_holder);
bool mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & entry, TableLockHolder & table_lock_holder);
Int64 getCurrentMutationVersion(
const DataPartPtr & part,

View File

@ -1,8 +1,32 @@
MergeTree
1
2
2
0
50 6225 0
0
50 6225 1900
ReplicatedMergeTree
1
2
2
0
50 6225 0
2
50 6225 0
Memory
1
2
2
0
50 6225 0
0
50 6225 1900
Join
1
2
2
0
50 6225 0
0
50 6225 0

View File

@ -4,18 +4,29 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
declare -a engines=("MergeTree" "ReplicatedMergeTree('/test/01162/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX', '1')")
declare -a engines=("MergeTree order by n" "ReplicatedMergeTree('/test/01162/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX', '1') order by n" "Memory" "Join(ALL, FULL, n)")
$CLICKHOUSE_CLIENT -q "CREATE OR REPLACE VIEW t1 AS SELECT number * 10 AS id, number * 100 AS value FROM numbers(20)"
for engine in "${engines[@]}"
do
$CLICKHOUSE_CLIENT -q "drop table if exists t"
$CLICKHOUSE_CLIENT -q "create table t (n int) engine=$engine order by n"
$CLICKHOUSE_CLIENT -q "create table t (n int) engine=$engine"
$CLICKHOUSE_CLIENT -q "select engine from system.tables where database=currentDatabase() and name='t'"
$CLICKHOUSE_CLIENT -q "insert into t values (1)"
$CLICKHOUSE_CLIENT -q "insert into t values (2)"
$CLICKHOUSE_CLIENT -q "select * from t order by n"
$CLICKHOUSE_CLIENT --mutations_sync=1 -q "alter table t delete where n global in (select * from (select * from t where n global in (1::Int32)))"
$CLICKHOUSE_CLIENT -q "select * from t order by n"
$CLICKHOUSE_CLIENT --mutations_sync=1 -q "alter table t delete where n global in (select t1.n from t as t1 join t as t2 on t1.n=t2.n where t1.n global in (select 2::Int32))"
$CLICKHOUSE_CLIENT --mutations_sync=1 -q "alter table t delete where n global in (select t1.n from t as t1 full join t as t2 on t1.n=t2.n where t1.n global in (select 2::Int32))"
$CLICKHOUSE_CLIENT -q "select count() from t"
$CLICKHOUSE_CLIENT -q "drop table t"
$CLICKHOUSE_CLIENT -q "drop table if exists test"
$CLICKHOUSE_CLIENT -q "CREATE TABLE test ENGINE=$engine AS SELECT number + 100 AS n, 0 AS test FROM numbers(50)"
$CLICKHOUSE_CLIENT -q "select count(), sum(n), sum(test) from test"
# FIXME it's not clear if the following query should fail or not
$CLICKHOUSE_CLIENT --mutations_sync=1 -q "ALTER TABLE test UPDATE test = (SELECT groupArray(id) FROM t1 GROUP BY 1)[n - 99] WHERE 1" 2>&1| grep -c "Unknown function"
$CLICKHOUSE_CLIENT -q "select count(), sum(n), sum(test) from test"
$CLICKHOUSE_CLIENT -q "drop table test"
done