diff --git a/docker/test/stress/stress b/docker/test/stress/stress index 6d90b9d5437..64cca4beb3a 100755 --- a/docker/test/stress/stress +++ b/docker/test/stress/stress @@ -77,7 +77,7 @@ def run_func_test( pipes = [] for i in range(0, len(output_paths)): f = open(output_paths[i], "w") - full_command = "{} {} {} {} {}".format( + full_command = "{} {} {} {} {} --stress".format( cmd, get_options(i, backward_compatibility_check), global_time_limit_option, diff --git a/docs/en/sql-reference/statements/alter/ttl.md b/docs/en/sql-reference/statements/alter/ttl.md index 37a171d4969..2682279d1f7 100644 --- a/docs/en/sql-reference/statements/alter/ttl.md +++ b/docs/en/sql-reference/statements/alter/ttl.md @@ -34,7 +34,7 @@ CREATE TABLE table_with_ttl ) ENGINE MergeTree() ORDER BY tuple() -TTL event_time + INTERVAL 3 MONTH; +TTL event_time + INTERVAL 3 MONTH SETTINGS min_bytes_for_wide_part = 0; INSERT INTO table_with_ttl VALUES (now(), 1, 'username1'); diff --git a/src/Processors/Formats/IOutputFormat.cpp b/src/Processors/Formats/IOutputFormat.cpp index 3c4e6861151..47ebaa9c5f5 100644 --- a/src/Processors/Formats/IOutputFormat.cpp +++ b/src/Processors/Formats/IOutputFormat.cpp @@ -73,7 +73,6 @@ void IOutputFormat::work() setRowsBeforeLimit(rows_before_limit_counter->get()); finalize(); - finalized = true; return; } @@ -120,9 +119,12 @@ void IOutputFormat::write(const Block & block) void IOutputFormat::finalize() { + if (finalized) + return; writePrefixIfNot(); writeSuffixIfNot(); finalizeImpl(); + finalized = true; } } diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 57e893e9683..5098dfd3ef1 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -427,18 +427,37 @@ public: void consume(Chunk chunk) override { + std::lock_guard lock(cancel_mutex); + if (cancelled) + return; writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } + void onCancel() override + { + std::lock_guard lock(cancel_mutex); + finalize(); + cancelled = true; + } + void onException() override { - if (!writer) - return; - onFinish(); + std::lock_guard lock(cancel_mutex); + finalize(); } void onFinish() override { + std::lock_guard lock(cancel_mutex); + finalize(); + } + +private: + void finalize() + { + if (!writer) + return; + try { writer->finalize(); @@ -454,9 +473,10 @@ public: } } -private: std::unique_ptr write_buf; OutputFormatPtr writer; + std::mutex cancel_mutex; + bool cancelled = false; }; class PartitionedHDFSSink : public PartitionedSink diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.cpp b/src/Storages/MergeTree/IMergeTreeDataPart.cpp index 416ef0614f6..0a05e061365 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPart.cpp @@ -929,7 +929,6 @@ void IMergeTreeDataPart::appendFilesOfPartitionAndMinMaxIndex(Strings & files) c void IMergeTreeDataPart::loadChecksums(bool require) { - //const String path = fs::path(getRelativePath()) / "checksums.txt"; bool exists = metadata_manager->exists("checksums.txt"); if (exists) { @@ -940,7 +939,7 @@ void IMergeTreeDataPart::loadChecksums(bool require) bytes_on_disk = checksums.getTotalSizeOnDisk(); } else - bytes_on_disk = data_part_storage->calculateTotalSizeOnDisk(); //calculateTotalSizeOnDisk(volume->getDisk(), getRelativePath()); + bytes_on_disk = data_part_storage->calculateTotalSizeOnDisk(); } else { diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index 7d0c37051e7..85fefab82ae 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -611,7 +611,7 @@ void finalizeMutatedPart( { auto out = data_part_storage_builder->writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, context->getWriteSettings()); DB::writeText(queryToString(codec->getFullCodecDesc()), *out); - } + } /// close fd { /// Write a file with a description of columns. @@ -625,9 +625,14 @@ void finalizeMutatedPart( new_data_part->minmax_idx = source_part->minmax_idx; new_data_part->modification_time = time(nullptr); new_data_part->loadProjections(false, false); - new_data_part->setBytesOnDisk(new_data_part->data_part_storage->calculateTotalSizeOnDisk()); - new_data_part->default_codec = codec; + + /// All information about sizes is stored in checksums. + /// It doesn't make sense to touch filesystem for sizes. + new_data_part->setBytesOnDisk(new_data_part->checksums.getTotalSizeOnDisk()); + /// Also use information from checksums new_data_part->calculateColumnsAndSecondaryIndicesSizesOnDisk(); + + new_data_part->default_codec = codec; } } diff --git a/src/Storages/StorageFile.cpp b/src/Storages/StorageFile.cpp index d138104018a..672727b1478 100644 --- a/src/Storages/StorageFile.cpp +++ b/src/Storages/StorageFile.cpp @@ -810,18 +810,37 @@ public: void consume(Chunk chunk) override { + std::lock_guard cancel_lock(cancel_mutex); + if (cancelled) + return; writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } + void onCancel() override + { + std::lock_guard cancel_lock(cancel_mutex); + finalize(); + cancelled = true; + } + void onException() override { - if (!writer) - return; - onFinish(); + std::lock_guard cancel_lock(cancel_mutex); + finalize(); } void onFinish() override { + std::lock_guard cancel_lock(cancel_mutex); + finalize(); + } + +private: + void finalize() + { + if (!writer) + return; + try { writer->finalize(); @@ -836,7 +855,6 @@ public: } } -private: StorageMetadataPtr metadata_snapshot; String table_name_for_log; @@ -854,6 +872,9 @@ private: ContextPtr context; int flags; std::unique_lock lock; + + std::mutex cancel_mutex; + bool cancelled = false; }; class PartitionedStorageFileSink : public PartitionedSink diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 130bc75a65c..aed3f541d47 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -599,18 +599,37 @@ public: void consume(Chunk chunk) override { + std::lock_guard lock(cancel_mutex); + if (cancelled) + return; writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } + void onCancel() override + { + std::lock_guard lock(cancel_mutex); + finalize(); + cancelled = true; + } + void onException() override { - if (!writer) - return; - onFinish(); + std::lock_guard lock(cancel_mutex); + finalize(); } void onFinish() override { + std::lock_guard lock(cancel_mutex); + finalize(); + } + +private: + void finalize() + { + if (!writer) + return; + try { writer->finalize(); @@ -625,11 +644,12 @@ public: } } -private: Block sample_block; std::optional format_settings; std::unique_ptr write_buf; OutputFormatPtr writer; + bool cancelled = false; + std::mutex cancel_mutex; }; diff --git a/src/Storages/StorageURL.cpp b/src/Storages/StorageURL.cpp index 15ae23305f3..acf7444dca4 100644 --- a/src/Storages/StorageURL.cpp +++ b/src/Storages/StorageURL.cpp @@ -447,18 +447,36 @@ StorageURLSink::StorageURLSink( void StorageURLSink::consume(Chunk chunk) { + std::lock_guard lock(cancel_mutex); + if (cancelled) + return; writer->write(getHeader().cloneWithColumns(chunk.detachColumns())); } +void StorageURLSink::onCancel() +{ + std::lock_guard lock(cancel_mutex); + finalize(); + cancelled = true; +} + void StorageURLSink::onException() { - if (!writer) - return; - onFinish(); + std::lock_guard lock(cancel_mutex); + finalize(); } void StorageURLSink::onFinish() { + std::lock_guard lock(cancel_mutex); + finalize(); +} + +void StorageURLSink::finalize() +{ + if (!writer) + return; + try { writer->finalize(); diff --git a/src/Storages/StorageURL.h b/src/Storages/StorageURL.h index 79371242bb1..0198eda9e67 100644 --- a/src/Storages/StorageURL.h +++ b/src/Storages/StorageURL.h @@ -114,12 +114,16 @@ public: std::string getName() const override { return "StorageURLSink"; } void consume(Chunk chunk) override; + void onCancel() override; void onException() override; void onFinish() override; private: + void finalize(); std::unique_ptr write_buf; OutputFormatPtr writer; + std::mutex cancel_mutex; + bool cancelled = false; }; class StorageURL : public IStorageURLBase diff --git a/tests/queries/0_stateless/02366_cancel_write_into_file.reference b/tests/queries/0_stateless/02366_cancel_write_into_file.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02366_cancel_write_into_file.sh b/tests/queries/0_stateless/02366_cancel_write_into_file.sh new file mode 100755 index 00000000000..060fe1ec74b --- /dev/null +++ b/tests/queries/0_stateless/02366_cancel_write_into_file.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash +# Tags: no-fasttest + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +for i in $(seq 1 10); +do + $CLICKHOUSE_CLIENT --query_id="02366_$i" -q "insert into function file('02366_data_$i.jsonl') select range(number % 1000) from numbers(100000) settings output_format_parallel_formatting=1" 2> /dev/null & +done + +sleep 2 + +$CLICKHOUSE_CLIENT -q "kill query where startsWith(query_id, '02366_') sync" > /dev/null 2>&1 + +for i in $(seq 1 10); +do + $CLICKHOUSE_CLIENT --query_id="02366_$i" -q "insert into function file('02366_data_$i.jsonl') select range(number % 1000) from numbers(100000) settings output_format_parallel_formatting=0" 2> /dev/null & +done + +sleep 2 + +$CLICKHOUSE_CLIENT -q "kill query where startsWith(query_id, '02366_') sync" > /dev/null 2>&1 + diff --git a/tests/queries/0_stateless/02368_cancel_write_into_hdfs.reference b/tests/queries/0_stateless/02368_cancel_write_into_hdfs.reference new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/02368_cancel_write_into_hdfs.sh b/tests/queries/0_stateless/02368_cancel_write_into_hdfs.sh new file mode 100755 index 00000000000..8262cd7eab5 --- /dev/null +++ b/tests/queries/0_stateless/02368_cancel_write_into_hdfs.sh @@ -0,0 +1,25 @@ +#!/usr/bin/env bash +# Tags: no-fasttest, no-stress + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + + +for i in $(seq 1 10); +do + $CLICKHOUSE_CLIENT --query_id="02368_$i" -q "insert into function hdfs('hdfs://localhost:12222/02368_data_$i.jsonl') select range(number % 1000) from numbers(100000) settings hdfs_truncate_on_insert=1, output_format_parallel_formatting=1" 2> /dev/null & +done + +sleep 2 + +$CLICKHOUSE_CLIENT -q "kill query where startsWith(query_id, '02368_') sync" > /dev/null 2>&1 + +for i in $(seq 1 10); +do + $CLICKHOUSE_CLIENT --query_id="02368_$i" -q "insert into function hdfs('hdfs://localhost:12222/02368_data_$i.jsonl') select range(number % 1000) from numbers(100000) settings hdfs_truncate_on_insert=1, output_format_parallel_formatting=0" 2> /dev/null & +done + +sleep 2 + +$CLICKHOUSE_CLIENT -q "kill query where startsWith(query_id, '02368_') sync" > /dev/null 2>&1 diff --git a/tests/queries/0_stateless/data_minio/02366_data.jsonl b/tests/queries/0_stateless/data_minio/02366_data.jsonl new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/queries/0_stateless/test_ugtxj2/tuples b/tests/queries/0_stateless/test_ugtxj2/tuples new file mode 100644 index 00000000000..e69de29bb2d