Merge pull request #12426 from ClickHouse/log-engine-rollback-on-insert-error

Rollback insertion error in Log engines
This commit is contained in:
alexey-milovidov 2020-07-16 22:50:48 +03:00 committed by GitHub
commit fde8c87a1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 218 additions and 35 deletions

View File

@ -12,6 +12,12 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNEXPECTED_END_OF_FILE;
}
FileChecker::FileChecker(DiskPtr disk_, const String & file_info_path_) : disk(std::move(disk_))
{
setPath(file_info_path_);
@ -24,19 +30,15 @@ void FileChecker::setPath(const String & file_info_path_)
tmp_files_info_path = parentPath(files_info_path) + "tmp_" + fileName(files_info_path);
}
void FileChecker::update(const String & file_path)
void FileChecker::update(const String & full_file_path)
{
initialize();
updateImpl(file_path);
save();
map[fileName(full_file_path)] = disk->getFileSize(full_file_path);
}
void FileChecker::update(const Strings::const_iterator & begin, const Strings::const_iterator & end)
void FileChecker::setEmpty(const String & full_file_path)
{
initialize();
for (auto it = begin; it != end; ++it)
updateImpl(*it);
save();
map[fileName(full_file_path)] = 0;
}
CheckResults FileChecker::check() const
@ -73,6 +75,28 @@ CheckResults FileChecker::check() const
return results;
}
void FileChecker::repair()
{
for (const auto & name_size : map)
{
const String & name = name_size.first;
size_t expected_size = name_size.second;
String path = parentPath(files_info_path) + name;
bool exists = disk->exists(path);
auto real_size = exists ? disk->getFileSize(path) : 0; /// No race condition assuming no one else is working with these files.
if (real_size < expected_size)
throw Exception(ErrorCodes::UNEXPECTED_END_OF_FILE, "Size of {} is less than expected. Size is {} but should be {}.",
path, real_size, expected_size);
if (real_size > expected_size)
{
LOG_WARNING(&Poco::Logger::get("FileChecker"), "Will truncate file {} that has size {} to size {}", path, real_size, expected_size);
disk->truncateFile(path, expected_size);
}
}
}
void FileChecker::initialize()
{
if (initialized)
@ -82,11 +106,6 @@ void FileChecker::initialize()
initialized = true;
}
void FileChecker::updateImpl(const String & file_path)
{
map[fileName(file_path)] = disk->getFileSize(file_path);
}
void FileChecker::save() const
{
{

View File

@ -14,19 +14,25 @@ class FileChecker
public:
FileChecker(DiskPtr disk_, const String & file_info_path_);
void setPath(const String & file_info_path_);
void update(const String & file_path);
void update(const Strings::const_iterator & begin, const Strings::const_iterator & end);
void update(const String & full_file_path);
void setEmpty(const String & full_file_path);
void save() const;
/// Check the files whose parameters are specified in sizes.json
CheckResults check() const;
/// Truncate files that have excessive size to the expected size.
/// Throw exception if the file size is less than expected.
/// The purpose of this function is to rollback a group of unfinished writes.
void repair();
private:
/// File name -> size.
using Map = std::map<String, UInt64>;
void initialize();
void updateImpl(const String & file_path);
void save() const;
void load(Map & local_map, const String & path) const;
DiskPtr disk;

View File

@ -19,6 +19,7 @@ namespace ErrorCodes
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int PATH_ACCESS_DENIED;
extern const int INCORRECT_DISK_INDEX;
extern const int CANNOT_TRUNCATE_FILE;
}
std::mutex DiskLocal::reservation_mutex;
@ -261,6 +262,13 @@ void DiskLocal::createHardLink(const String & src_path, const String & dst_path)
DB::createHardLink(disk_path + src_path, disk_path + dst_path);
}
void DiskLocal::truncateFile(const String & path, size_t size)
{
int res = truncate((disk_path + path).c_str(), size);
if (-1 == res)
throwFromErrnoWithPath("Cannot truncate file " + path, path, ErrorCodes::CANNOT_TRUNCATE_FILE);
}
void DiskLocal::createFile(const String & path)
{
Poco::File(disk_path + path).createFile();

View File

@ -99,6 +99,8 @@ public:
void createHardLink(const String & src_path, const String & dst_path) override;
void truncateFile(const String & path, size_t size) override;
const String getType() const override { return "local"; }
private:

View File

@ -408,6 +408,17 @@ void DiskMemory::setReadOnly(const String &)
throw Exception("Method setReadOnly is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
void DiskMemory::truncateFile(const String & path, size_t size)
{
std::lock_guard lock(mutex);
auto file_it = files.find(path);
if (file_it == files.end())
throw Exception("File '" + path + "' doesn't exist", ErrorCodes::FILE_DOESNT_EXIST);
file_it->second.data.resize(size);
}
using DiskMemoryPtr = std::shared_ptr<DiskMemory>;

View File

@ -90,6 +90,8 @@ public:
void createHardLink(const String & src_path, const String & dst_path) override;
void truncateFile(const String & path, size_t size) override;
const String getType() const override { return "memory"; }
private:

View File

@ -8,6 +8,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
bool IDisk::isDirectoryEmpty(const String & path)
{
return !iterateDirectory(path)->isValid();
@ -42,4 +47,9 @@ void IDisk::copy(const String & from_path, const std::shared_ptr<IDisk> & to_dis
}
}
void IDisk::truncateFile(const String &, size_t)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Truncate operation is not implemented for disk of type {}", getType());
}
}

View File

@ -172,6 +172,9 @@ public:
/// Create hardlink from `src_path` to `dst_path`.
virtual void createHardLink(const String & src_path, const String & dst_path) = 0;
/// Truncate file to specified size.
virtual void truncateFile(const String & path, size_t size);
/// Return disk type - "local", "s3", etc.
virtual const String getType() const = 0;
};

View File

@ -127,7 +127,12 @@ public:
{
try
{
writeSuffix();
if (!done)
{
/// Rollback partial writes.
streams.clear();
storage.file_checker.repair();
}
}
catch (...)
{
@ -298,7 +303,6 @@ void LogBlockOutputStream::writeSuffix()
{
if (done)
return;
done = true;
WrittenStreams written_streams;
IDataType::SerializeBinaryBulkSettings settings;
@ -323,9 +327,12 @@ void LogBlockOutputStream::writeSuffix()
column_files.push_back(storage.files[name_stream.first].data_file_path);
column_files.push_back(storage.marks_file_path);
storage.file_checker.update(column_files.begin(), column_files.end());
for (const auto & file : column_files)
storage.file_checker.update(file);
storage.file_checker.save();
streams.clear();
done = true;
}
@ -427,6 +434,7 @@ StorageLog::StorageLog(
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool attach,
size_t max_compress_block_size_)
: IStorage(table_id_)
, disk(std::move(disk_))
@ -442,13 +450,31 @@ StorageLog::StorageLog(
if (relative_path_.empty())
throw Exception("Storage " + getName() + " requires data path", ErrorCodes::INCORRECT_FILE_NAME);
if (!attach)
{
/// create directories if they do not exist
disk->createDirectories(table_path);
}
else
{
try
{
file_checker.repair();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
for (const auto & column : storage_metadata.getColumns().getAllPhysical())
addFiles(column.name, *column.type);
marks_file_path = table_path + DBMS_STORAGE_LOG_MARKS_FILE_NAME;
if (!attach)
for (const auto & file : files)
file_checker.setEmpty(file.second.data_file_path);
}
@ -655,7 +681,7 @@ void registerStorageLog(StorageFactory & factory)
return StorageLog::create(
disk, args.relative_data_path, args.table_id, args.columns, args.constraints,
args.context.getSettings().max_compress_block_size);
args.attach, args.context.getSettings().max_compress_block_size);
}, features);
}

View File

@ -54,6 +54,7 @@ protected:
const StorageID & table_id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
bool attach,
size_t max_compress_block_size_);
private:

View File

@ -161,11 +161,12 @@ public:
, lock(storage.rwlock)
, data_out_file(storage.table_path + "data.bin")
, data_out_compressed(storage.disk->writeFile(data_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append))
, data_out(*data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), storage.max_compress_block_size)
, data_out(std::make_unique<CompressedWriteBuffer>(
*data_out_compressed, CompressionCodecFactory::instance().getDefaultCodec(), storage.max_compress_block_size))
, index_out_file(storage.table_path + "index.mrk")
, index_out_compressed(storage.disk->writeFile(index_out_file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append))
, index_out(*index_out_compressed)
, block_out(data_out, 0, metadata_snapshot->getSampleBlock(), false, &index_out, storage.disk->getFileSize(data_out_file))
, index_out(std::make_unique<CompressedWriteBuffer>(*index_out_compressed))
, block_out(*data_out, 0, metadata_snapshot->getSampleBlock(), false, index_out.get(), storage.disk->getFileSize(data_out_file))
{
}
@ -173,7 +174,16 @@ public:
{
try
{
writeSuffix();
if (!done)
{
/// Rollback partial writes.
data_out.reset();
data_out_compressed.reset();
index_out.reset();
index_out_compressed.reset();
storage.file_checker.repair();
}
}
catch (...)
{
@ -194,13 +204,14 @@ public:
return;
block_out.writeSuffix();
data_out.next();
data_out->next();
data_out_compressed->next();
index_out.next();
index_out->next();
index_out_compressed->next();
storage.file_checker.update(data_out_file);
storage.file_checker.update(index_out_file);
storage.file_checker.save();
done = true;
}
@ -212,10 +223,10 @@ private:
String data_out_file;
std::unique_ptr<WriteBuffer> data_out_compressed;
CompressedWriteBuffer data_out;
std::unique_ptr<CompressedWriteBuffer> data_out;
String index_out_file;
std::unique_ptr<WriteBuffer> index_out_compressed;
CompressedWriteBuffer index_out;
std::unique_ptr<CompressedWriteBuffer> index_out;
NativeBlockOutputStream block_out;
bool done = false;
@ -249,6 +260,20 @@ StorageStripeLog::StorageStripeLog(
{
/// create directories if they do not exist
disk->createDirectories(table_path);
file_checker.setEmpty(table_path + "data.bin");
file_checker.setEmpty(table_path + "index.mrk");
}
else
{
try
{
file_checker.repair();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -118,7 +118,12 @@ public:
{
try
{
writeSuffix();
if (!done)
{
/// Rollback partial writes.
streams.clear();
storage.file_checker.repair();
}
}
catch (...)
{
@ -277,11 +282,13 @@ void TinyLogBlockOutputStream::writeSuffix()
{
if (done)
return;
done = true;
/// If nothing was written - leave the table in initial state.
if (streams.empty())
{
done = true;
return;
}
WrittenStreams written_streams;
IDataType::SerializeBinaryBulkSettings settings;
@ -303,9 +310,12 @@ void TinyLogBlockOutputStream::writeSuffix()
for (auto & pair : streams)
column_files.push_back(storage.files[pair.first].data_file_path);
storage.file_checker.update(column_files.begin(), column_files.end());
for (const auto & file : column_files)
storage.file_checker.update(file);
storage.file_checker.save();
streams.clear();
done = true;
}
@ -352,9 +362,24 @@ StorageTinyLog::StorageTinyLog(
/// create directories if they do not exist
disk->createDirectories(table_path);
}
else
{
try
{
file_checker.repair();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
for (const auto & col : storage_metadata.getColumns().getAllPhysical())
addFiles(col.name, *col.type);
if (!attach)
for (const auto & file : files)
file_checker.setEmpty(file.second.data_file_path);
}

View File

@ -31,7 +31,7 @@ DB::StoragePtr createStorage(DB::DiskPtr & disk)
names_and_types.emplace_back("a", std::make_shared<DataTypeUInt64>());
StoragePtr table = StorageLog::create(
disk, "table/", StorageID("test", "test"), ColumnsDescription{names_and_types}, ConstraintsDescription{}, 1048576);
disk, "table/", StorageID("test", "test"), ColumnsDescription{names_and_types}, ConstraintsDescription{}, false, 1048576);
table->startup();
@ -100,6 +100,7 @@ std::string writeData(int rows, DB::StoragePtr & table, const DB::Context & cont
BlockOutputStreamPtr out = table->write({}, metadata_snapshot, context);
out->write(block);
out->writeSuffix();
return data;
}
@ -115,7 +116,8 @@ std::string readData(DB::StoragePtr & table, const DB::Context & context)
QueryProcessingStage::Enum stage = table->getQueryProcessingStage(context);
BlockInputStreamPtr in = std::make_shared<TreeExecutorBlockInputStream>(std::move(table->read(column_names, metadata_snapshot, {}, context, stage, 8192, 1)[0]));
BlockInputStreamPtr in = std::make_shared<TreeExecutorBlockInputStream>(
std::move(table->read(column_names, metadata_snapshot, {}, context, stage, 8192, 1)[0]));
Block sample;
{

View File

@ -0,0 +1,3 @@
Testing TinyLog
Testing StripeLog
Testing Log

View File

@ -0,0 +1,40 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL=none
. $CURDIR/../shell_config.sh
function test()
{
ENGINE=$1
MAX_MEM=4096
echo "Testing $ENGINE"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS log";
$CLICKHOUSE_CLIENT --query "CREATE TABLE log (x UInt64, y UInt64, z UInt64) ENGINE = $ENGINE";
while true; do
MAX_MEM=$((2 * $MAX_MEM))
$CLICKHOUSE_CLIENT --query "INSERT INTO log SELECT number, number, number FROM numbers(1000000)" --max_memory_usage $MAX_MEM > ${CLICKHOUSE_TMP}/insert_result 2>&1
grep -o -F 'Memory limit' ${CLICKHOUSE_TMP}/insert_result || cat ${CLICKHOUSE_TMP}/insert_result
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(x + y + z) FROM log" > ${CLICKHOUSE_TMP}/select_result 2>&1;
grep -o -F 'File not found' ${CLICKHOUSE_TMP}/select_result || cat ${CLICKHOUSE_TMP}/select_result
[[ $MAX_MEM -gt 200000000 ]] && break;
done
$CLICKHOUSE_CLIENT --query "DROP TABLE log";
}
test TinyLog | grep -v -P '^(Memory limit|0\t0|File not found|[1-9]000000\t)'
test StripeLog | grep -v -P '^(Memory limit|0\t0|File not found|[1-9]000000\t)'
test Log | grep -v -P '^(Memory limit|0\t0|File not found|[1-9]000000\t)'
rm "${CLICKHOUSE_TMP}/insert_result"
rm "${CLICKHOUSE_TMP}/select_result"