mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Fix mutation when table contains projections (#33679)
This commit is contained in:
parent
dfd8d4067d
commit
62441f0a0f
@ -226,7 +226,8 @@ bool isStorageTouchedByMutations(
|
||||
/// Interpreter must be alive, when we use result of execute() method.
|
||||
/// For some reason it may copy context and and give it into ExpressionTransform
|
||||
/// after that we will use context from destroyed stack frame in our stream.
|
||||
InterpreterSelectQuery interpreter(select_query, context_copy, storage, metadata_snapshot, SelectQueryOptions().ignoreLimits());
|
||||
InterpreterSelectQuery interpreter(
|
||||
select_query, context_copy, storage, metadata_snapshot, SelectQueryOptions().ignoreLimits().ignoreProjections());
|
||||
auto io = interpreter.execute();
|
||||
PullingPipelineExecutor executor(io.pipeline);
|
||||
|
||||
@ -291,7 +292,7 @@ MutationsInterpreter::MutationsInterpreter(
|
||||
, commands(std::move(commands_))
|
||||
, context(Context::createCopy(context_))
|
||||
, can_execute(can_execute_)
|
||||
, select_limits(SelectQueryOptions().analyze(!can_execute).ignoreLimits())
|
||||
, select_limits(SelectQueryOptions().analyze(!can_execute).ignoreLimits().ignoreProjections())
|
||||
{
|
||||
mutation_ast = prepare(!can_execute);
|
||||
}
|
||||
@ -732,7 +733,7 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
|
||||
const ASTPtr select_query = prepareInterpreterSelectQuery(stages_copy, /* dry_run = */ true);
|
||||
InterpreterSelectQuery interpreter{
|
||||
select_query, context, storage, metadata_snapshot,
|
||||
SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits()};
|
||||
SelectQueryOptions().analyze(/* dry_run = */ false).ignoreLimits().ignoreProjections()};
|
||||
|
||||
auto first_stage_header = interpreter.getSampleBlock();
|
||||
QueryPlan plan;
|
||||
|
@ -1310,7 +1310,7 @@ void IMergeTreeDataPart::remove() const
|
||||
|
||||
void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_shared_data) const
|
||||
{
|
||||
String to = parent_to + "/" + relative_path;
|
||||
String to = fs::path(parent_to) / relative_path;
|
||||
auto disk = volume->getDisk();
|
||||
if (checksums.empty())
|
||||
{
|
||||
@ -1320,7 +1320,7 @@ void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_sh
|
||||
"Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: checksums.txt is missing",
|
||||
fullPath(disk, to));
|
||||
/// If the part is not completely written, we cannot use fast path by listing files.
|
||||
disk->removeSharedRecursive(to + "/", keep_shared_data);
|
||||
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1333,15 +1333,15 @@ void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_sh
|
||||
# pragma GCC diagnostic ignored "-Wunused-variable"
|
||||
#endif
|
||||
for (const auto & [file, _] : checksums.files)
|
||||
disk->removeSharedFile(to + "/" + file, keep_shared_data);
|
||||
disk->removeSharedFile(fs::path(to) / file, keep_shared_data);
|
||||
#if !defined(__clang__)
|
||||
# pragma GCC diagnostic pop
|
||||
#endif
|
||||
|
||||
for (const auto & file : {"checksums.txt", "columns.txt"})
|
||||
disk->removeSharedFile(to + "/" + file, keep_shared_data);
|
||||
disk->removeSharedFileIfExists(to + "/" + DEFAULT_COMPRESSION_CODEC_FILE_NAME, keep_shared_data);
|
||||
disk->removeSharedFileIfExists(to + "/" + DELETE_ON_DESTROY_MARKER_FILE_NAME, keep_shared_data);
|
||||
disk->removeSharedFile(fs::path(to) / file, keep_shared_data);
|
||||
disk->removeSharedFileIfExists(fs::path(to) / DEFAULT_COMPRESSION_CODEC_FILE_NAME, keep_shared_data);
|
||||
disk->removeSharedFileIfExists(fs::path(to) / DELETE_ON_DESTROY_MARKER_FILE_NAME, keep_shared_data);
|
||||
|
||||
disk->removeSharedRecursive(to, keep_shared_data);
|
||||
}
|
||||
@ -1351,7 +1351,7 @@ void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_sh
|
||||
|
||||
LOG_ERROR(storage.log, "Cannot quickly remove directory {} by removing files; fallback to recursive removal. Reason: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
|
||||
|
||||
disk->removeSharedRecursive(to + "/", keep_shared_data);
|
||||
disk->removeSharedRecursive(fs::path(to) / "", keep_shared_data);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -120,7 +120,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
|
||||
ctx->disk = global_ctx->space_reservation->getDisk();
|
||||
|
||||
String local_part_path = global_ctx->data->relative_data_path;
|
||||
String local_tmp_part_basename = local_tmp_prefix + global_ctx->future_part->name + (global_ctx->parent_part ? ".proj" : "");
|
||||
String local_tmp_part_basename = local_tmp_prefix + global_ctx->future_part->name + local_tmp_suffix;
|
||||
String local_new_part_tmp_path = local_part_path + local_tmp_part_basename + "/";
|
||||
|
||||
if (ctx->disk->exists(local_new_part_tmp_path))
|
||||
|
@ -650,7 +650,6 @@ public:
|
||||
".tmp_proj");
|
||||
|
||||
next_level_parts.push_back(executeHere(tmp_part_merge_task));
|
||||
|
||||
next_level_parts.back()->is_temp = true;
|
||||
}
|
||||
|
||||
@ -1081,9 +1080,7 @@ private:
|
||||
ctx->disk->createDirectories(destination);
|
||||
for (auto p_it = ctx->disk->iterateDirectory(it->path()); p_it->isValid(); p_it->next())
|
||||
{
|
||||
String p_destination = destination + "/";
|
||||
String p_file_name = p_it->name();
|
||||
p_destination += p_it->name();
|
||||
String p_destination = fs::path(destination) / p_it->name();
|
||||
ctx->disk->createHardLink(p_it->path(), p_destination);
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,12 @@
|
||||
<?xml version="1.0"?>
|
||||
<clickhouse>
|
||||
<logger>
|
||||
<log>/var/log/clickhouse-server/log.log</log>
|
||||
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
|
||||
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
|
||||
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
|
||||
</logger>
|
||||
|
||||
<path>/var/lib/clickhouse/</path>
|
||||
<users_config>users.xml</users_config>
|
||||
</clickhouse>
|
@ -0,0 +1,11 @@
|
||||
<?xml version="1.0"?>
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<!-- Lower block size to test multi-level merge of projection parts -->
|
||||
<max_block_size>1000</max_block_size>
|
||||
<min_insert_block_size_bytes>1000</min_insert_block_size_bytes>
|
||||
<min_insert_block_size_rows>1000</min_insert_block_size_rows>
|
||||
</default>
|
||||
</profiles>
|
||||
</clickhouse>
|
88
tests/integration/test_mutations_with_projection/test.py
Normal file
88
tests/integration/test_mutations_with_projection/test.py
Normal file
@ -0,0 +1,88 @@
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
|
||||
instance_test_mutations = cluster.add_instance(
|
||||
"test_mutations_with_projection",
|
||||
main_configs=["configs/config.xml"],
|
||||
user_configs=["configs/users.xml"],
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
instance_test_mutations.query(
|
||||
"""
|
||||
CREATE TABLE video_log
|
||||
(
|
||||
`datetime` DateTime, -- 20,000 records per second
|
||||
`user_id` UInt64, -- Cardinality == 100,000,000
|
||||
`device_id` UInt64, -- Cardinality == 200,000,000
|
||||
`video_id` UInt64, -- Cardinality == 100,00000
|
||||
`domain` LowCardinality(String), -- Cardinality == 100
|
||||
`bytes` UInt64, -- Ranging from 128 to 1152
|
||||
`duration` UInt64, -- Ranging from 100 to 400
|
||||
PROJECTION p_norm (SELECT datetime, device_id, bytes, duration ORDER BY device_id),
|
||||
PROJECTION p_agg (SELECT toStartOfHour(datetime) AS hour, domain, sum(bytes), avg(duration) GROUP BY hour, domain)
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
PARTITION BY toDate(datetime) -- Daily partitioning
|
||||
ORDER BY (user_id, device_id, video_id) -- Can only favor one column here
|
||||
SETTINGS index_granularity = 1000;
|
||||
"""
|
||||
)
|
||||
|
||||
instance_test_mutations.query(
|
||||
"""CREATE TABLE rng (`user_id_raw` UInt64, `device_id_raw` UInt64, `video_id_raw` UInt64, `domain_raw` UInt64, `bytes_raw` UInt64, `duration_raw` UInt64) ENGINE = GenerateRandom(1024);"""
|
||||
)
|
||||
|
||||
instance_test_mutations.query(
|
||||
"""INSERT INTO video_log SELECT toUnixTimestamp(toDateTime(today())) + (rowNumberInAllBlocks() / 20000), user_id_raw % 100000000 AS user_id, device_id_raw % 200000000 AS device_id, video_id_raw % 100000000 AS video_id, domain_raw % 100, (bytes_raw % 1024) + 128, (duration_raw % 300) + 100 FROM rng LIMIT 500000;"""
|
||||
)
|
||||
|
||||
instance_test_mutations.query("""OPTIMIZE TABLE video_log FINAL;""")
|
||||
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_mutations_with_multi_level_merge_of_projections(started_cluster):
|
||||
try:
|
||||
instance_test_mutations.query(
|
||||
"""ALTER TABLE video_log UPDATE bytes = bytes + 10086 WHERE 1;"""
|
||||
)
|
||||
|
||||
def count_and_changed():
|
||||
return instance_test_mutations.query(
|
||||
"SELECT count(), countIf(bytes > 10000) FROM video_log SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT CSV"
|
||||
).splitlines()
|
||||
|
||||
all_done = False
|
||||
for wait_times_for_mutation in range(
|
||||
100
|
||||
): # wait for replication 80 seconds max
|
||||
time.sleep(0.8)
|
||||
|
||||
if count_and_changed() == ["500000,500000"]:
|
||||
all_done = True
|
||||
break
|
||||
|
||||
print(
|
||||
instance_test_mutations.query(
|
||||
"SELECT mutation_id, command, parts_to_do, is_done, latest_failed_part, latest_fail_reason, parts_to_do_names FROM system.mutations WHERE table = 'video_log' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVWithNames"
|
||||
)
|
||||
)
|
||||
|
||||
assert (count_and_changed(), all_done) == (["500000,500000"], True)
|
||||
assert instance_test_mutations.query(
|
||||
f"SELECT DISTINCT arraySort(projections) FROM system.parts WHERE table = 'video_log' SETTINGS force_index_by_date = 0, force_primary_key = 0 FORMAT TSVRaw"
|
||||
).splitlines() == ["['p_agg','p_norm']"]
|
||||
|
||||
finally:
|
||||
instance_test_mutations.query(f"""DROP TABLE video_log""")
|
7
tests/queries/0_stateless/01710_projection_mutation.sql
Normal file
7
tests/queries/0_stateless/01710_projection_mutation.sql
Normal file
@ -0,0 +1,7 @@
|
||||
DROP TABLE IF EXISTS t;
|
||||
|
||||
CREATE TABLE t (`key` UInt32, `created_at` Date, `value` UInt32, PROJECTION xxx (SELECT key, created_at, sum(value) GROUP BY key, created_at)) ENGINE = MergeTree PARTITION BY toYYYYMM(created_at) ORDER BY key;
|
||||
|
||||
INSERT INTO t SELECT 1 AS key, today() + (number % 30), number FROM numbers(1000);
|
||||
|
||||
ALTER TABLE t UPDATE value = 0 WHERE (value > 0) AND (created_at >= '2021-12-21') SETTINGS allow_experimental_projection_optimization = 1;
|
Loading…
Reference in New Issue
Block a user