Merge branch 'master' into wait_for_mutation_race

This commit is contained in:
Alexander Gololobov 2022-08-04 20:27:09 +02:00 committed by GitHub
commit eb3e2ed092
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 163 additions and 23 deletions

View File

@ -77,7 +77,7 @@ def run_func_test(
pipes = []
for i in range(0, len(output_paths)):
f = open(output_paths[i], "w")
full_command = "{} {} {} {} {}".format(
full_command = "{} {} {} {} {} --stress".format(
cmd,
get_options(i, backward_compatibility_check),
global_time_limit_option,

View File

@ -34,7 +34,7 @@ CREATE TABLE table_with_ttl
)
ENGINE MergeTree()
ORDER BY tuple()
TTL event_time + INTERVAL 3 MONTH;
TTL event_time + INTERVAL 3 MONTH
SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO table_with_ttl VALUES (now(), 1, 'username1');

View File

@ -73,7 +73,6 @@ void IOutputFormat::work()
setRowsBeforeLimit(rows_before_limit_counter->get());
finalize();
finalized = true;
return;
}
@ -120,9 +119,12 @@ void IOutputFormat::write(const Block & block)
void IOutputFormat::finalize()
{
if (finalized)
return;
writePrefixIfNot();
writeSuffixIfNot();
finalizeImpl();
finalized = true;
}
}

View File

@ -427,18 +427,37 @@ public:
void consume(Chunk chunk) override
{
std::lock_guard lock(cancel_mutex);
if (cancelled)
return;
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void onCancel() override
{
std::lock_guard lock(cancel_mutex);
finalize();
cancelled = true;
}
void onException() override
{
if (!writer)
return;
onFinish();
std::lock_guard lock(cancel_mutex);
finalize();
}
void onFinish() override
{
std::lock_guard lock(cancel_mutex);
finalize();
}
private:
void finalize()
{
if (!writer)
return;
try
{
writer->finalize();
@ -454,9 +473,10 @@ public:
}
}
private:
std::unique_ptr<WriteBuffer> write_buf;
OutputFormatPtr writer;
std::mutex cancel_mutex;
bool cancelled = false;
};
class PartitionedHDFSSink : public PartitionedSink

View File

@ -929,7 +929,6 @@ void IMergeTreeDataPart::appendFilesOfPartitionAndMinMaxIndex(Strings & files) c
void IMergeTreeDataPart::loadChecksums(bool require)
{
//const String path = fs::path(getRelativePath()) / "checksums.txt";
bool exists = metadata_manager->exists("checksums.txt");
if (exists)
{
@ -940,7 +939,7 @@ void IMergeTreeDataPart::loadChecksums(bool require)
bytes_on_disk = checksums.getTotalSizeOnDisk();
}
else
bytes_on_disk = data_part_storage->calculateTotalSizeOnDisk(); //calculateTotalSizeOnDisk(volume->getDisk(), getRelativePath());
bytes_on_disk = data_part_storage->calculateTotalSizeOnDisk();
}
else
{

View File

@ -611,7 +611,7 @@ void finalizeMutatedPart(
{
auto out = data_part_storage_builder->writeFile(IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096, context->getWriteSettings());
DB::writeText(queryToString(codec->getFullCodecDesc()), *out);
}
} /// close fd
{
/// Write a file with a description of columns.
@ -625,9 +625,14 @@ void finalizeMutatedPart(
new_data_part->minmax_idx = source_part->minmax_idx;
new_data_part->modification_time = time(nullptr);
new_data_part->loadProjections(false, false);
new_data_part->setBytesOnDisk(new_data_part->data_part_storage->calculateTotalSizeOnDisk());
new_data_part->default_codec = codec;
/// All information about sizes is stored in checksums.
/// It doesn't make sense to touch filesystem for sizes.
new_data_part->setBytesOnDisk(new_data_part->checksums.getTotalSizeOnDisk());
/// Also use information from checksums
new_data_part->calculateColumnsAndSecondaryIndicesSizesOnDisk();
new_data_part->default_codec = codec;
}
}

View File

@ -810,18 +810,37 @@ public:
void consume(Chunk chunk) override
{
std::lock_guard cancel_lock(cancel_mutex);
if (cancelled)
return;
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void onCancel() override
{
std::lock_guard cancel_lock(cancel_mutex);
finalize();
cancelled = true;
}
void onException() override
{
if (!writer)
return;
onFinish();
std::lock_guard cancel_lock(cancel_mutex);
finalize();
}
void onFinish() override
{
std::lock_guard cancel_lock(cancel_mutex);
finalize();
}
private:
void finalize()
{
if (!writer)
return;
try
{
writer->finalize();
@ -836,7 +855,6 @@ public:
}
}
private:
StorageMetadataPtr metadata_snapshot;
String table_name_for_log;
@ -854,6 +872,9 @@ private:
ContextPtr context;
int flags;
std::unique_lock<std::shared_timed_mutex> lock;
std::mutex cancel_mutex;
bool cancelled = false;
};
class PartitionedStorageFileSink : public PartitionedSink

View File

@ -599,18 +599,37 @@ public:
void consume(Chunk chunk) override
{
std::lock_guard lock(cancel_mutex);
if (cancelled)
return;
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void onCancel() override
{
std::lock_guard lock(cancel_mutex);
finalize();
cancelled = true;
}
void onException() override
{
if (!writer)
return;
onFinish();
std::lock_guard lock(cancel_mutex);
finalize();
}
void onFinish() override
{
std::lock_guard lock(cancel_mutex);
finalize();
}
private:
void finalize()
{
if (!writer)
return;
try
{
writer->finalize();
@ -625,11 +644,12 @@ public:
}
}
private:
Block sample_block;
std::optional<FormatSettings> format_settings;
std::unique_ptr<WriteBuffer> write_buf;
OutputFormatPtr writer;
bool cancelled = false;
std::mutex cancel_mutex;
};

View File

@ -447,18 +447,36 @@ StorageURLSink::StorageURLSink(
void StorageURLSink::consume(Chunk chunk)
{
std::lock_guard lock(cancel_mutex);
if (cancelled)
return;
writer->write(getHeader().cloneWithColumns(chunk.detachColumns()));
}
void StorageURLSink::onCancel()
{
std::lock_guard lock(cancel_mutex);
finalize();
cancelled = true;
}
void StorageURLSink::onException()
{
if (!writer)
return;
onFinish();
std::lock_guard lock(cancel_mutex);
finalize();
}
void StorageURLSink::onFinish()
{
std::lock_guard lock(cancel_mutex);
finalize();
}
void StorageURLSink::finalize()
{
if (!writer)
return;
try
{
writer->finalize();

View File

@ -114,12 +114,16 @@ public:
std::string getName() const override { return "StorageURLSink"; }
void consume(Chunk chunk) override;
void onCancel() override;
void onException() override;
void onFinish() override;
private:
void finalize();
std::unique_ptr<WriteBuffer> write_buf;
OutputFormatPtr writer;
std::mutex cancel_mutex;
bool cancelled = false;
};
class StorageURL : public IStorageURLBase

View File

@ -0,0 +1,26 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
for i in $(seq 1 10);
do
$CLICKHOUSE_CLIENT --query_id="02366_$i" -q "insert into function file('02366_data_$i.jsonl') select range(number % 1000) from numbers(100000) settings output_format_parallel_formatting=1" 2> /dev/null &
done
sleep 2
$CLICKHOUSE_CLIENT -q "kill query where startsWith(query_id, '02366_') sync" > /dev/null 2>&1
for i in $(seq 1 10);
do
$CLICKHOUSE_CLIENT --query_id="02366_$i" -q "insert into function file('02366_data_$i.jsonl') select range(number % 1000) from numbers(100000) settings output_format_parallel_formatting=0" 2> /dev/null &
done
sleep 2
$CLICKHOUSE_CLIENT -q "kill query where startsWith(query_id, '02366_') sync" > /dev/null 2>&1

View File

@ -0,0 +1,25 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-stress
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
for i in $(seq 1 10);
do
$CLICKHOUSE_CLIENT --query_id="02368_$i" -q "insert into function hdfs('hdfs://localhost:12222/02368_data_$i.jsonl') select range(number % 1000) from numbers(100000) settings hdfs_truncate_on_insert=1, output_format_parallel_formatting=1" 2> /dev/null &
done
sleep 2
$CLICKHOUSE_CLIENT -q "kill query where startsWith(query_id, '02368_') sync" > /dev/null 2>&1
for i in $(seq 1 10);
do
$CLICKHOUSE_CLIENT --query_id="02368_$i" -q "insert into function hdfs('hdfs://localhost:12222/02368_data_$i.jsonl') select range(number % 1000) from numbers(100000) settings hdfs_truncate_on_insert=1, output_format_parallel_formatting=0" 2> /dev/null &
done
sleep 2
$CLICKHOUSE_CLIENT -q "kill query where startsWith(query_id, '02368_') sync" > /dev/null 2>&1