fsyncs for metadata files of part

This commit is contained in:
Anton Popov 2020-09-01 18:26:49 +03:00
parent 927eb32e88
commit 3cadc9033a
7 changed files with 32 additions and 18 deletions

View File

@ -52,7 +52,7 @@ public:
virtual void initPrimaryIndex() {} virtual void initPrimaryIndex() {}
virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) = 0; virtual void finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync) = 0;
virtual void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & /* checksums */, bool /* sync */) {} virtual void finishPrimaryIndexSerialization(MergeTreeData::DataPart::Checksums & /* checksums */, bool /* sync */) {}
virtual void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & /* checksums */, bool /* sync */) {} virtual void finishSkipIndicesSerialization(MergeTreeData::DataPart::Checksums & /* checksums */, bool /* sync */) {}
Columns releaseIndexColumns(); Columns releaseIndexColumns();

View File

@ -332,7 +332,7 @@ void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(
checksums.files["primary.idx"].file_size = index_stream->count(); checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash(); checksums.files["primary.idx"].file_hash = index_stream->getHash();
if (sync) if (sync)
index_stream->sync(); index_file_stream->sync();
index_stream = nullptr; index_stream = nullptr;
} }
} }

View File

@ -111,7 +111,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
part_columns = *total_columns_list; part_columns = *total_columns_list;
if (new_part->isStoredOnDisk()) if (new_part->isStoredOnDisk())
finalizePartOnDisk(new_part, part_columns, checksums); finalizePartOnDisk(new_part, part_columns, checksums, sync);
new_part->setColumns(part_columns); new_part->setColumns(part_columns);
new_part->rows_count = rows_count; new_part->rows_count = rows_count;
@ -126,7 +126,8 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
void MergedBlockOutputStream::finalizePartOnDisk( void MergedBlockOutputStream::finalizePartOnDisk(
const MergeTreeData::MutableDataPartPtr & new_part, const MergeTreeData::MutableDataPartPtr & new_part,
NamesAndTypesList & part_columns, NamesAndTypesList & part_columns,
MergeTreeData::DataPart::Checksums & checksums) MergeTreeData::DataPart::Checksums & checksums,
bool sync)
{ {
if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part)) if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || isCompactPart(new_part))
{ {
@ -143,6 +144,8 @@ void MergedBlockOutputStream::finalizePartOnDisk(
count_out_hashing.next(); count_out_hashing.next();
checksums.files["count.txt"].file_size = count_out_hashing.count(); checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash(); checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
if (sync)
count_out->sync();
} }
if (!new_part->ttl_infos.empty()) if (!new_part->ttl_infos.empty())
@ -153,6 +156,8 @@ void MergedBlockOutputStream::finalizePartOnDisk(
new_part->ttl_infos.write(out_hashing); new_part->ttl_infos.write(out_hashing);
checksums.files["ttl.txt"].file_size = out_hashing.count(); checksums.files["ttl.txt"].file_size = out_hashing.count();
checksums.files["ttl.txt"].file_hash = out_hashing.getHash(); checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
if (sync)
out->sync();
} }
removeEmptyColumnsFromPart(new_part, part_columns, checksums); removeEmptyColumnsFromPart(new_part, part_columns, checksums);
@ -161,12 +166,16 @@ void MergedBlockOutputStream::finalizePartOnDisk(
/// Write a file with a description of columns. /// Write a file with a description of columns.
auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096); auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096);
part_columns.writeText(*out); part_columns.writeText(*out);
if (sync)
out->sync();
} }
{ {
/// Write file with checksums. /// Write file with checksums.
auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096); auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096);
checksums.write(*out); checksums.write(*out);
if (sync)
out->sync();
} }
} }

View File

@ -59,7 +59,8 @@ private:
void finalizePartOnDisk( void finalizePartOnDisk(
const MergeTreeData::MutableDataPartPtr & new_part, const MergeTreeData::MutableDataPartPtr & new_part,
NamesAndTypesList & part_columns, NamesAndTypesList & part_columns,
MergeTreeData::DataPart::Checksums & checksums); MergeTreeData::DataPart::Checksums & checksums,
bool sync);
private: private:
NamesAndTypesList columns_list; NamesAndTypesList columns_list;

View File

@ -0,0 +1 @@
CREATE TABLE test_sync (a Int, s String) ENGINE = MergeTree ORDER BY a SETTINGS fsync_after_insert = 1, min_compressed_bytes_to_fsync_after_merge = 1;

28
utils/durability-test/durability-test.sh Normal file → Executable file
View File

@ -17,12 +17,12 @@ fi
function run() function run()
{ {
sshpass -p $PASSWORD ssh -p $SSH_PORT root@localhost "$1" sshpass -p $PASSWORD ssh -p $SSH_PORT root@localhost "$1" 2>/dev/null
} }
function copy() function copy()
{ {
sshpass -p $PASSWORD scp -r -P $SSH_PORT $1 root@localhost:$2 sshpass -p $PASSWORD scp -r -P $SSH_PORT $1 root@localhost:$2 2>/dev/null
} }
function wait_vm_for_start() function wait_vm_for_start()
@ -50,8 +50,8 @@ function wait_clickhouse_for_start()
{ {
echo "Waiting until ClickHouse started..." echo "Waiting until ClickHouse started..."
started=0 started=0
for i in {0..15}; do for i in {0..30}; do
run "clickhouse client --query 'select 1'" run "clickhouse client --query 'select 1'" > /dev/null
if [ $? -eq 0 ]; then if [ $? -eq 0 ]; then
started=1 started=1
break break
@ -70,7 +70,7 @@ echo "Downloading image"
curl -O $URL/$IMAGE curl -O $URL/$IMAGE
qemu-img resize $IMAGE +10G qemu-img resize $IMAGE +10G
virt-customize -a $IMAGE --root-password password:$PASSWORD virt-customize -a $IMAGE --root-password password:$PASSWORD > /dev/null 2>&1
virt-copy-in -a $IMAGE sshd_config /etc/ssh virt-copy-in -a $IMAGE sshd_config /etc/ssh
echo "Starting VM" echo "Starting VM"
@ -93,8 +93,8 @@ if [[ -z $CLICKHOUSE_CONFIG_DIR ]]; then
CLICKHOUSE_CONFIG_DIR=/etc/clickhouse-server CLICKHOUSE_CONFIG_DIR=/etc/clickhouse-server
fi fi
echo "Using ClickHouse binary: " $CLICKHOUSE_BINARY echo "Using ClickHouse binary:" $CLICKHOUSE_BINARY
echo "Using ClickHouse config from: " $CLICKHOUSE_CONFIG_DIR echo "Using ClickHouse config from:" $CLICKHOUSE_CONFIG_DIR
copy $CLICKHOUSE_BINARY /usr/bin copy $CLICKHOUSE_BINARY /usr/bin
copy $CLICKHOUSE_CONFIG_DIR /etc copy $CLICKHOUSE_CONFIG_DIR /etc
@ -104,23 +104,19 @@ echo "Prepared VM"
echo "Starting ClickHouse" echo "Starting ClickHouse"
run "clickhouse server --config-file=/etc/clickhouse-server/config.xml > clickhouse-server.log 2>&1" & run "clickhouse server --config-file=/etc/clickhouse-server/config.xml > clickhouse-server.log 2>&1" &
wait_clickhouse_for_start wait_clickhouse_for_start
echo "Started ClickHouse"
query=`cat $CREATE_QUERY` query=`cat $CREATE_QUERY`
echo "Executing query:" $query echo "Executing query:" $query
run "clickhouse client --query '$query'" run "clickhouse client --query '$query'"
query=`cat $INSERT_QUERY` query=`cat $INSERT_QUERY`
echo "Will run in a loop query: " $query echo "Will run in a loop query: " $query
run "clickhouse benchmark <<< '$query'" & run "clickhouse benchmark <<< '$query' -c 8" &
echo "Running queries" echo "Running queries"
pid=`pidof qemu-system-x86_64` pid=`pidof qemu-system-x86_64`
sec=$(( (RANDOM % 3) + 25 )) sec=$(( (RANDOM % 5) + 25 ))
ms=$(( RANDOM % 1000 )) ms=$(( RANDOM % 1000 ))
echo "Will kill VM in $sec.$ms sec" echo "Will kill VM in $sec.$ms sec"
@ -130,6 +126,8 @@ kill -9 $pid
echo "Restarting" echo "Restarting"
sleep 5s
./startup.exp > qemu.log 2>&1 & ./startup.exp > qemu.log 2>&1 &
wait_vm_for_start wait_vm_for_start
@ -137,10 +135,12 @@ run "rm -r *data/system"
run "clickhouse server --config-file=/etc/clickhouse-server/config.xml > clickhouse-server.log 2>&1" & run "clickhouse server --config-file=/etc/clickhouse-server/config.xml > clickhouse-server.log 2>&1" &
wait_clickhouse_for_start wait_clickhouse_for_start
pid=`pidof qemu-system-x86_64`
result=`run "grep $TABLE_NAME clickhouse-server.log | grep 'Caught exception while loading metadata'"` result=`run "grep $TABLE_NAME clickhouse-server.log | grep 'Caught exception while loading metadata'"`
if [[ -n $result ]]; then if [[ -n $result ]]; then
echo "FAIL. Can't attach table:" echo "FAIL. Can't attach table:"
echo $result echo $result
kill -9 $pid
exit 1 exit 1
fi fi
@ -148,7 +148,9 @@ result=`run "grep $TABLE_NAME clickhouse-server.log | grep 'Considering to remov
if [[ -n $result ]]; then if [[ -n $result ]]; then
echo "FAIL. Have broken parts:" echo "FAIL. Have broken parts:"
echo $result echo $result
kill -9 $pid
exit 1 exit 1
fi fi
kill -9 $pid
echo OK echo OK

View File

@ -0,0 +1 @@
INSERT INTO test_sync SELECT number, toString(number) FROM numbers(10)