diff --git a/src/Common/FileChecker.cpp b/src/Common/FileChecker.cpp index 687b4dccca7..6cbec3bda77 100644 --- a/src/Common/FileChecker.cpp +++ b/src/Common/FileChecker.cpp @@ -12,6 +12,12 @@ namespace DB { +namespace ErrorCodes +{ + extern const int UNEXPECTED_END_OF_FILE; +} + + FileChecker::FileChecker(DiskPtr disk_, const String & file_info_path_) : disk(std::move(disk_)) { setPath(file_info_path_); @@ -24,19 +30,15 @@ void FileChecker::setPath(const String & file_info_path_) tmp_files_info_path = parentPath(files_info_path) + "tmp_" + fileName(files_info_path); } -void FileChecker::update(const String & file_path) +void FileChecker::update(const String & full_file_path) { initialize(); - updateImpl(file_path); - save(); + map[fileName(full_file_path)] = disk->getFileSize(full_file_path); } -void FileChecker::update(const Strings::const_iterator & begin, const Strings::const_iterator & end) +void FileChecker::setEmpty(const String & full_file_path) { - initialize(); - for (auto it = begin; it != end; ++it) - updateImpl(*it); - save(); + map[fileName(full_file_path)] = 0; } CheckResults FileChecker::check() const @@ -73,6 +75,28 @@ CheckResults FileChecker::check() const return results; } +void FileChecker::repair() +{ + for (const auto & name_size : map) + { + const String & name = name_size.first; + size_t expected_size = name_size.second; + String path = parentPath(files_info_path) + name; + bool exists = disk->exists(path); + auto real_size = exists ? disk->getFileSize(path) : 0; /// No race condition assuming no one else is working with these files. + + if (real_size < expected_size) + throw Exception(ErrorCodes::UNEXPECTED_END_OF_FILE, "Size of {} is less than expected. Size is {} but should be {}.", + path, real_size, expected_size); + + if (real_size > expected_size) + { + LOG_WARNING(&Poco::Logger::get("FileChecker"), "Will truncate file {} that has size {} to size {}", path, real_size, expected_size); + disk->truncateFile(path, expected_size); + } + } +} + void FileChecker::initialize() { if (initialized) @@ -82,11 +106,6 @@ void FileChecker::initialize() initialized = true; } -void FileChecker::updateImpl(const String & file_path) -{ - map[fileName(file_path)] = disk->getFileSize(file_path); -} - void FileChecker::save() const { { diff --git a/src/Common/FileChecker.h b/src/Common/FileChecker.h index 83db397e78c..015d4cadb07 100644 --- a/src/Common/FileChecker.h +++ b/src/Common/FileChecker.h @@ -14,19 +14,25 @@ class FileChecker public: FileChecker(DiskPtr disk_, const String & file_info_path_); void setPath(const String & file_info_path_); - void update(const String & file_path); - void update(const Strings::const_iterator & begin, const Strings::const_iterator & end); + + void update(const String & full_file_path); + void setEmpty(const String & full_file_path); + void save() const; /// Check the files whose parameters are specified in sizes.json CheckResults check() const; + /// Truncate files that have excessive size to the expected size. + /// Throw exception if the file size is less than expected. + /// The purpose of this function is to rollback a group of unfinished writes. + void repair(); + private: /// File name -> size. using Map = std::map; void initialize(); void updateImpl(const String & file_path); - void save() const; void load(Map & local_map, const String & path) const; DiskPtr disk; diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index 68f5ee99a7a..f9e988211da 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -19,6 +19,7 @@ namespace ErrorCodes extern const int EXCESSIVE_ELEMENT_IN_CONFIG; extern const int PATH_ACCESS_DENIED; extern const int INCORRECT_DISK_INDEX; + extern const int CANNOT_TRUNCATE_FILE; } std::mutex DiskLocal::reservation_mutex; @@ -261,6 +262,13 @@ void DiskLocal::createHardLink(const String & src_path, const String & dst_path) DB::createHardLink(disk_path + src_path, disk_path + dst_path); } +void DiskLocal::truncateFile(const String & path, size_t size) +{ + int res = truncate((disk_path + path).c_str(), size); + if (-1 == res) + throwFromErrnoWithPath("Cannot truncate file " + path, path, ErrorCodes::CANNOT_TRUNCATE_FILE); +} + void DiskLocal::createFile(const String & path) { Poco::File(disk_path + path).createFile(); diff --git a/src/Disks/DiskLocal.h b/src/Disks/DiskLocal.h index 3dab4614d5d..71c4dc0aec9 100644 --- a/src/Disks/DiskLocal.h +++ b/src/Disks/DiskLocal.h @@ -99,6 +99,8 @@ public: void createHardLink(const String & src_path, const String & dst_path) override; + void truncateFile(const String & path, size_t size) override; + const String getType() const override { return "local"; } private: diff --git a/src/Disks/DiskMemory.cpp b/src/Disks/DiskMemory.cpp index 3e43d159ba5..96d9e22c414 100644 --- a/src/Disks/DiskMemory.cpp +++ b/src/Disks/DiskMemory.cpp @@ -408,6 +408,17 @@ void DiskMemory::setReadOnly(const String &) throw Exception("Method setReadOnly is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED); } +void DiskMemory::truncateFile(const String & path, size_t size) +{ + std::lock_guard lock(mutex); + + auto file_it = files.find(path); + if (file_it == files.end()) + throw Exception("File '" + path + "' doesn't exist", ErrorCodes::FILE_DOESNT_EXIST); + + file_it->second.data.resize(size); +} + using DiskMemoryPtr = std::shared_ptr; diff --git a/src/Disks/DiskMemory.h b/src/Disks/DiskMemory.h index f7948019fe8..fc265ddef03 100644 --- a/src/Disks/DiskMemory.h +++ b/src/Disks/DiskMemory.h @@ -90,6 +90,8 @@ public: void createHardLink(const String & src_path, const String & dst_path) override; + void truncateFile(const String & path, size_t size) override; + const String getType() const override { return "memory"; } private: diff --git a/src/Disks/IDisk.cpp b/src/Disks/IDisk.cpp index 837ddf1b6b2..9d7424d1286 100644 --- a/src/Disks/IDisk.cpp +++ b/src/Disks/IDisk.cpp @@ -8,6 +8,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + bool IDisk::isDirectoryEmpty(const String & path) { return !iterateDirectory(path)->isValid(); @@ -42,4 +47,9 @@ void IDisk::copy(const String & from_path, const std::shared_ptr & to_dis } } +void IDisk::truncateFile(const String &, size_t) +{ + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getType()); +} + } diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index 77a52a7a5d6..0a977feb9a1 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -172,6 +172,9 @@ public: /// Create hardlink from `src_path` to `dst_path`. virtual void createHardLink(const String & src_path, const String & dst_path) = 0; + /// Truncate file to specified size. + virtual void truncateFile(const String & path, size_t size); + /// Return disk type - "local", "s3", etc. virtual const String getType() const = 0; }; diff --git a/src/Storages/StorageLog.cpp b/src/Storages/StorageLog.cpp index 39fa1d1af70..9cfc906108a 100644 --- a/src/Storages/StorageLog.cpp +++ b/src/Storages/StorageLog.cpp @@ -127,7 +127,12 @@ public: { try { - writeSuffix(); + if (!done) + { + /// Rollback partial writes. + streams.clear(); + storage.file_checker.repair(); + } } catch (...) { @@ -298,7 +303,6 @@ void LogBlockOutputStream::writeSuffix() { if (done) return; - done = true; WrittenStreams written_streams; IDataType::SerializeBinaryBulkSettings settings; @@ -323,9 +327,12 @@ void LogBlockOutputStream::writeSuffix() column_files.push_back(storage.files[name_stream.first].data_file_path); column_files.push_back(storage.marks_file_path); - storage.file_checker.update(column_files.begin(), column_files.end()); + for (const auto & file : column_files) + storage.file_checker.update(file); + storage.file_checker.save(); streams.clear(); + done = true; } @@ -427,6 +434,7 @@ StorageLog::StorageLog( const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, + bool attach, size_t max_compress_block_size_) : IStorage(table_id_) , disk(std::move(disk_)) @@ -442,13 +450,31 @@ StorageLog::StorageLog( if (relative_path_.empty()) throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME); - /// create directories if they do not exist - disk->createDirectories(table_path); + if (!attach) + { + /// create directories if they do not exist + disk->createDirectories(table_path); + } + else + { + try + { + file_checker.repair(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } for (const auto & column : storage_metadata.getColumns().getAllPhysical()) addFiles(column.name, *column.type); marks_file_path = table_path + DBMS_STORAGE_LOG_MARKS_FILE_NAME; + + if (!attach) + for (const auto & file : files) + file_checker.setEmpty(file.second.data_file_path); } @@ -655,7 +681,7 @@ void registerStorageLog(StorageFactory & factory) return StorageLog::create( disk, args.relative_data_path, args.table_id, args.columns, args.constraints, - args.context.getSettings().max_compress_block_size); + args.attach, args.context.getSettings().max_compress_block_size); }, features); } diff --git a/src/Storages/StorageLog.h b/src/Storages/StorageLog.h index d020f906609..96acb1668e2 100644 --- a/src/Storages/StorageLog.h +++ b/src/Storages/StorageLog.h @@ -54,6 +54,7 @@ protected: const StorageID & table_id_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, + bool attach, size_t max_compress_block_size_); private: diff --git a/src/Storages/StorageStripeLog.cpp b/src/Storages/StorageStripeLog.cpp index e55cc190f80..ae8162d5f1b 100644 --- a/src/Storages/StorageStripeLog.cpp +++ b/src/Storages/StorageStripeLog.cpp @@ -161,11 +161,12 @@ public: , lock(storage.rwlock) , data_out_file(storage.table_path + "data.bin") , data_out_compressed(storage.disk->writeFile(data_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append)) - , data_out(*data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), storage.max_compress_block_size) + , data_out(std::make_unique( + *data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), storage.max_compress_block_size)) , index_out_file(storage.table_path + "index.mrk") , index_out_compressed(storage.disk->writeFile(index_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append)) - , index_out(*index_out_compressed) - , block_out(data_out, 0, metadata_snapshot->getSampleBlock(), false, &index_out, storage.disk->getFileSize(data_out_file)) + , index_out(std::make_unique(*index_out_compressed)) + , block_out(*data_out, 0, metadata_snapshot->getSampleBlock(), false, index_out.get(), storage.disk->getFileSize(data_out_file)) { } @@ -173,7 +174,16 @@ public: { try { - writeSuffix(); + if (!done) + { + /// Rollback partial writes. + data_out.reset(); + data_out_compressed.reset(); + index_out.reset(); + index_out_compressed.reset(); + + storage.file_checker.repair(); + } } catch (...) { @@ -194,13 +204,14 @@ public: return; block_out.writeSuffix(); - data_out.next(); + data_out->next(); data_out_compressed->next(); - index_out.next(); + index_out->next(); index_out_compressed->next(); storage.file_checker.update(data_out_file); storage.file_checker.update(index_out_file); + storage.file_checker.save(); done = true; } @@ -212,10 +223,10 @@ private: String data_out_file; std::unique_ptr data_out_compressed; - CompressedWriteBuffer data_out; + std::unique_ptr data_out; String index_out_file; std::unique_ptr index_out_compressed; - CompressedWriteBuffer index_out; + std::unique_ptr index_out; NativeBlockOutputStream block_out; bool done = false; @@ -249,6 +260,20 @@ StorageStripeLog::StorageStripeLog( { /// create directories if they do not exist disk->createDirectories(table_path); + + file_checker.setEmpty(table_path + "data.bin"); + file_checker.setEmpty(table_path + "index.mrk"); + } + else + { + try + { + file_checker.repair(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } } } diff --git a/src/Storages/StorageTinyLog.cpp b/src/Storages/StorageTinyLog.cpp index ef8c30cacbe..b68ac6ae5f1 100644 --- a/src/Storages/StorageTinyLog.cpp +++ b/src/Storages/StorageTinyLog.cpp @@ -118,7 +118,12 @@ public: { try { - writeSuffix(); + if (!done) + { + /// Rollback partial writes. + streams.clear(); + storage.file_checker.repair(); + } } catch (...) { @@ -277,11 +282,13 @@ void TinyLogBlockOutputStream::writeSuffix() { if (done) return; - done = true; /// If nothing was written - leave the table in initial state. if (streams.empty()) + { + done = true; return; + } WrittenStreams written_streams; IDataType::SerializeBinaryBulkSettings settings; @@ -303,9 +310,12 @@ void TinyLogBlockOutputStream::writeSuffix() for (auto & pair : streams) column_files.push_back(storage.files[pair.first].data_file_path); - storage.file_checker.update(column_files.begin(), column_files.end()); + for (const auto & file : column_files) + storage.file_checker.update(file); + storage.file_checker.save(); streams.clear(); + done = true; } @@ -352,9 +362,24 @@ StorageTinyLog::StorageTinyLog( /// create directories if they do not exist disk->createDirectories(table_path); } + else + { + try + { + file_checker.repair(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } for (const auto & col : storage_metadata.getColumns().getAllPhysical()) addFiles(col.name, *col.type); + + if (!attach) + for (const auto & file : files) + file_checker.setEmpty(file.second.data_file_path); } diff --git a/src/Storages/tests/gtest_storage_log.cpp b/src/Storages/tests/gtest_storage_log.cpp index c97adaf118d..13c96fbab54 100644 --- a/src/Storages/tests/gtest_storage_log.cpp +++ b/src/Storages/tests/gtest_storage_log.cpp @@ -31,7 +31,7 @@ DB::StoragePtr createStorage(DB::DiskPtr & disk) names_and_types.emplace_back("a", std::make_shared()); StoragePtr table = StorageLog::create( - disk, "table/", StorageID("test", "test"), ColumnsDescription{names_and_types}, ConstraintsDescription{}, 1048576); + disk, "table/", StorageID("test", "test"), ColumnsDescription{names_and_types}, ConstraintsDescription{}, false, 1048576); table->startup(); @@ -100,6 +100,7 @@ std::string writeData(int rows, DB::StoragePtr & table, const DB::Context & cont BlockOutputStreamPtr out = table->write({}, metadata_snapshot, context); out->write(block); + out->writeSuffix(); return data; } @@ -115,7 +116,8 @@ std::string readData(DB::StoragePtr & table, const DB::Context & context) QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context); - BlockInputStreamPtr in = std::make_shared(std::move(table->read(column_names, metadata_snapshot, {}, context, stage, 8192, 1)[0])); + BlockInputStreamPtr in = std::make_shared( + std::move(table->read(column_names, metadata_snapshot, {}, context, stage, 8192, 1)[0])); Block sample; { diff --git a/tests/queries/0_stateless/01383_log_broken_table.reference b/tests/queries/0_stateless/01383_log_broken_table.reference new file mode 100644 index 00000000000..1bc7c914e46 --- /dev/null +++ b/tests/queries/0_stateless/01383_log_broken_table.reference @@ -0,0 +1,3 @@ +Testing TinyLog +Testing StripeLog +Testing Log diff --git a/tests/queries/0_stateless/01383_log_broken_table.sh b/tests/queries/0_stateless/01383_log_broken_table.sh new file mode 100755 index 00000000000..2afac11e7c2 --- /dev/null +++ b/tests/queries/0_stateless/01383_log_broken_table.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none +. $CURDIR/../shell_config.sh + + +function test() +{ + ENGINE=$1 + MAX_MEM=4096 + + echo "Testing $ENGINE" + + $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS log"; + $CLICKHOUSE_CLIENT --query "CREATE TABLE log (x UInt64, y UInt64, z UInt64) ENGINE = $ENGINE"; + + while true; do + MAX_MEM=$((2 * $MAX_MEM)) + + $CLICKHOUSE_CLIENT --query "INSERT INTO log SELECT number, number, number FROM numbers(1000000)" --max_memory_usage $MAX_MEM > ${CLICKHOUSE_TMP}/insert_result 2>&1 + + grep -o -F 'Memory limit' ${CLICKHOUSE_TMP}/insert_result || cat ${CLICKHOUSE_TMP}/insert_result + + $CLICKHOUSE_CLIENT --query "SELECT count(), sum(x + y + z) FROM log" > ${CLICKHOUSE_TMP}/select_result 2>&1; + + grep -o -F 'File not found' ${CLICKHOUSE_TMP}/select_result || cat ${CLICKHOUSE_TMP}/select_result + + [[ $MAX_MEM -gt 200000000 ]] && break; + done + + $CLICKHOUSE_CLIENT --query "DROP TABLE log"; +} + +test TinyLog | grep -v -P '^(Memory limit|0\t0|File not found|[1-9]000000\t)' +test StripeLog | grep -v -P '^(Memory limit|0\t0|File not found|[1-9]000000\t)' +test Log | grep -v -P '^(Memory limit|0\t0|File not found|[1-9]000000\t)' + +rm "${CLICKHOUSE_TMP}/insert_result" +rm "${CLICKHOUSE_TMP}/select_result"