ClickHouse/src/Storages/MergeTree/MergeTreeWriteAheadLog.cpp

225 lines
7.4 KiB
C++
Raw Normal View History

2020-04-14 19:47:19 +00:00
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <IO/ReadHelpers.h>
2020-04-14 19:47:19 +00:00
#include <Poco/File.h>
2020-09-01 22:25:10 +00:00
#include <sys/time.h>
2020-04-14 19:47:19 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION;
2020-05-14 20:08:15 +00:00
extern const int CANNOT_READ_ALL_DATA;
2020-05-20 12:02:02 +00:00
extern const int BAD_DATA_PART_NAME;
2020-05-29 15:02:12 +00:00
extern const int CORRUPTED_DATA;
2020-04-14 19:47:19 +00:00
}
MergeTreeWriteAheadLog::MergeTreeWriteAheadLog(
2020-09-01 22:25:10 +00:00
MergeTreeData & storage_,
2020-04-14 19:47:19 +00:00
const DiskPtr & disk_,
2020-05-14 20:08:15 +00:00
const String & name_)
2020-04-14 19:47:19 +00:00
: storage(storage_)
, disk(disk_)
2020-05-14 20:08:15 +00:00
, name(name_)
, path(storage.getRelativeDataPath() + name_)
2020-09-01 22:25:10 +00:00
, pool(storage.global_context.getSchedulePool())
{
init();
2020-09-01 22:25:10 +00:00
sync_task = pool.createTask("MergeTreeWriteAheadLog::sync", [this]
{
std::lock_guard lock(write_mutex);
out->sync();
sync_scheduled = false;
});
}
2020-04-14 19:47:19 +00:00
void MergeTreeWriteAheadLog::init()
{
out = disk->writeFile(path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append);
/// Small hack: in NativeBlockOutputStream header is used only in `getHeader` method.
/// To avoid complex logic of changing it during ALTERs we leave it empty.
block_out = std::make_unique<NativeBlockOutputStream>(*out, 0, Block{});
min_block_number = std::numeric_limits<Int64>::max();
2020-05-14 20:08:15 +00:00
max_block_number = -1;
2020-09-01 22:25:10 +00:00
bytes_at_last_sync = 0;
}
2020-04-14 19:47:19 +00:00
2020-05-29 15:02:12 +00:00
void MergeTreeWriteAheadLog::addPart(const Block & block, const String & part_name)
2020-04-14 19:47:19 +00:00
{
std::lock_guard lock(write_mutex);
auto part_info = MergeTreePartInfo::fromPartName(part_name, storage.format_version);
min_block_number = std::min(min_block_number, part_info.min_block);
max_block_number = std::max(max_block_number, part_info.max_block);
writeIntBinary(static_cast<UInt8>(0), *out); /// version
2020-05-29 15:02:12 +00:00
writeIntBinary(static_cast<UInt8>(ActionType::ADD_PART), *out);
2020-04-14 19:47:19 +00:00
writeStringBinary(part_name, *out);
block_out->write(block);
block_out->flush();
2020-09-01 22:25:10 +00:00
sync(lock);
2020-04-14 19:47:19 +00:00
auto max_wal_bytes = storage.getSettings()->write_ahead_log_max_bytes;
if (out->count() > max_wal_bytes)
rotate(lock);
2020-04-14 19:47:19 +00:00
}
2020-05-29 15:02:12 +00:00
void MergeTreeWriteAheadLog::dropPart(const String & part_name)
{
std::lock_guard lock(write_mutex);
writeIntBinary(static_cast<UInt8>(0), *out);
writeIntBinary(static_cast<UInt8>(ActionType::DROP_PART), *out);
writeStringBinary(part_name, *out);
2020-09-01 22:25:10 +00:00
sync(lock);
2020-05-29 15:02:12 +00:00
}
void MergeTreeWriteAheadLog::rotate(const std::lock_guard<std::mutex> &)
2020-04-14 19:47:19 +00:00
{
String new_name = String(WAL_FILE_NAME) + "_"
+ toString(min_block_number) + "_"
+ toString(max_block_number) + WAL_FILE_EXTENSION;
2020-05-14 20:08:15 +00:00
disk->replaceFile(path, storage.getRelativeDataPath() + new_name);
init();
2020-04-14 19:47:19 +00:00
}
2020-06-26 11:30:23 +00:00
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const StorageMetadataPtr & metadata_snapshot)
2020-04-14 19:47:19 +00:00
{
std::lock_guard lock(write_mutex);
2020-05-29 15:02:12 +00:00
MergeTreeData::MutableDataPartsVector parts;
2020-04-14 19:47:19 +00:00
auto in = disk->readFile(path, DBMS_DEFAULT_BUFFER_SIZE);
NativeBlockInputStream block_in(*in, 0);
2020-05-29 15:02:12 +00:00
NameSet dropped_parts;
2020-04-14 19:47:19 +00:00
while (!in->eof())
{
2020-05-14 20:08:15 +00:00
MergeTreeData::MutableDataPartPtr part;
2020-04-14 19:47:19 +00:00
UInt8 version;
String part_name;
2020-05-14 20:08:15 +00:00
Block block;
2020-05-29 15:02:12 +00:00
ActionType action_type;
2020-05-14 20:08:15 +00:00
try
{
readIntBinary(version, *in);
if (version != 0)
throw Exception("Unknown WAL format version: " + toString(version), ErrorCodes::UNKNOWN_FORMAT_VERSION);
2020-05-29 15:02:12 +00:00
readIntBinary(action_type, *in);
2020-05-14 20:08:15 +00:00
readStringBinary(part_name, *in);
2020-05-29 15:02:12 +00:00
if (action_type == ActionType::DROP_PART)
{
dropped_parts.insert(part_name);
}
else if (action_type == ActionType::ADD_PART)
{
auto part_disk = storage.reserveSpace(0)->getDisk();
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk);
2020-05-29 15:02:12 +00:00
part = storage.createPart(
part_name,
MergeTreeDataPartType::IN_MEMORY,
MergeTreePartInfo::fromPartName(part_name, storage.format_version),
single_disk_volume,
2020-05-29 15:02:12 +00:00
part_name);
block = block_in.read();
}
else
{
throw Exception("Unknown action type: " + toString(static_cast<UInt8>(action_type)), ErrorCodes::CORRUPTED_DATA);
}
2020-05-14 20:08:15 +00:00
}
catch (const Exception & e)
{
2020-05-20 12:02:02 +00:00
if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA
|| e.code() == ErrorCodes::UNKNOWN_FORMAT_VERSION
2020-05-29 15:02:12 +00:00
|| e.code() == ErrorCodes::BAD_DATA_PART_NAME
|| e.code() == ErrorCodes::CORRUPTED_DATA)
2020-05-14 20:08:15 +00:00
{
LOG_WARNING(&Poco::Logger::get(storage.getLogName() + " (WriteAheadLog)"),
"WAL file '{}' is broken. {}", path, e.displayText());
2020-05-14 20:08:15 +00:00
/// If file is broken, do not write new parts to it.
/// But if it contains any part rotate and save them.
if (max_block_number == -1)
disk->remove(path);
else if (name == DEFAULT_WAL_FILE_NAME)
rotate(lock);
2020-05-14 20:08:15 +00:00
break;
}
throw;
}
2020-05-29 15:02:12 +00:00
if (action_type == ActionType::ADD_PART)
{
2020-08-27 14:06:14 +00:00
MergedBlockOutputStream part_out(part, metadata_snapshot, block.getNamesAndTypesList(), {}, CompressionCodecFactory::instance().get("NONE", {}));
2020-05-14 20:08:15 +00:00
2020-05-29 15:02:12 +00:00
part->minmax_idx.update(block, storage.minmax_idx_columns);
2020-06-26 11:30:23 +00:00
part->partition.create(metadata_snapshot, block, 0);
if (metadata_snapshot->hasSortingKey())
metadata_snapshot->getSortingKey().expression->execute(block);
2020-05-14 20:08:15 +00:00
2020-05-29 15:02:12 +00:00
part_out.writePrefix();
part_out.write(block);
part_out.writeSuffixAndFinalizePart(part);
2020-04-14 19:47:19 +00:00
2020-05-29 15:02:12 +00:00
min_block_number = std::min(min_block_number, part->info.min_block);
max_block_number = std::max(max_block_number, part->info.max_block);
parts.push_back(std::move(part));
}
2020-04-14 19:47:19 +00:00
}
2020-05-29 15:02:12 +00:00
MergeTreeData::MutableDataPartsVector result;
std::copy_if(parts.begin(), parts.end(), std::back_inserter(result),
[&dropped_parts](const auto & part) { return dropped_parts.count(part->name) == 0; });
2020-04-14 19:47:19 +00:00
return result;
}
2020-09-01 22:25:10 +00:00
void MergeTreeWriteAheadLog::sync(const std::lock_guard<std::mutex> &)
{
size_t bytes_to_sync = storage.getSettings()->write_ahead_log_bytes_to_fsync;
time_t time_to_sync = storage.getSettings()->write_ahead_log_interval_ms_to_fsync;
size_t current_bytes = out->count();
if (bytes_to_sync && current_bytes - bytes_at_last_sync > bytes_to_sync)
{
sync_task->schedule();
bytes_at_last_sync = current_bytes;
}
else if (time_to_sync && !sync_scheduled)
{
sync_task->scheduleAfter(time_to_sync);
sync_scheduled = true;
}
}
std::optional<MergeTreeWriteAheadLog::MinMaxBlockNumber>
MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(const String & filename)
{
Int64 min_block;
Int64 max_block;
ReadBufferFromString in(filename);
if (!checkString(WAL_FILE_NAME, in)
|| !checkChar('_', in)
|| !tryReadIntText(min_block, in)
|| !checkChar('_', in)
|| !tryReadIntText(max_block, in))
{
return {};
}
return std::make_pair(min_block, max_block);
}
2020-04-14 19:47:19 +00:00
}