Fix error

This commit is contained in:
Alexey Milovidov 2021-01-05 04:49:15 +03:00
parent cbc54e2dd7
commit 5368b20a71
4 changed files with 116 additions and 100 deletions

View File

@ -18,6 +18,7 @@ public:
void update(const String & full_file_path);
void setEmpty(const String & full_file_path);
void save() const;
bool empty() const { return map.empty(); }
/// Check the files whose parameters are specified in sizes.json
CheckResults check() const;

View File

@ -113,93 +113,6 @@ private:
};
class LogBlockOutputStream final : public IBlockOutputStream
{
public:
explicit LogBlockOutputStream(
StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_))
, marks_stream(
storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite))
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
}
~LogBlockOutputStream() override
{
try
{
if (!done)
{
/// Rollback partial writes.
streams.clear();
storage.file_checker.repair();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override;
void writeSuffix() override;
private:
StorageLog & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_timed_mutex> lock;
bool done = false;
struct Stream
{
Stream(const DiskPtr & disk, const String & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) :
plain(disk->writeFile(data_path, max_compress_block_size, WriteMode::Append)),
compressed(*plain, std::move(codec), max_compress_block_size),
plain_offset(disk->getFileSize(data_path))
{
}
std::unique_ptr<WriteBuffer> plain;
CompressedWriteBuffer compressed;
size_t plain_offset; /// How many bytes were in the file at the time the LogBlockOutputStream was created.
void finalize()
{
compressed.next();
plain->next();
}
};
using Mark = StorageLog::Mark;
using MarksForColumns = std::vector<std::pair<size_t, Mark>>;
using FileStreams = std::map<String, Stream>;
FileStreams streams;
using WrittenStreams = std::set<String>;
std::unique_ptr<WriteBuffer> marks_stream; /// Declared below `lock` to make the file open when rwlock is captured.
using SerializeState = IDataType::SerializeBinaryBulkStatePtr;
using SerializeStates = std::map<String, SerializeState>;
SerializeStates serialize_states;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenStreams & written_streams);
void writeData(const String & name, const IDataType & type, const IColumn & column,
MarksForColumns & out_marks,
WrittenStreams & written_streams);
void writeMarks(MarksForColumns && marks);
};
Chunk LogSource::generate()
{
Block res;
@ -207,7 +120,7 @@ Chunk LogSource::generate()
if (rows_read == rows_limit)
return {};
if (storage.disk->isDirectoryEmpty(storage.table_path))
if (storage.file_checker.empty())
return {};
/// How many rows to read for the next block.
@ -284,6 +197,101 @@ void LogSource::readData(const String & name, const IDataType & type, IColumn &
}
class LogBlockOutputStream final : public IBlockOutputStream
{
public:
explicit LogBlockOutputStream(
StorageLog & storage_, const StorageMetadataPtr & metadata_snapshot_, std::unique_lock<std::shared_timed_mutex> && lock_)
: storage(storage_)
, metadata_snapshot(metadata_snapshot_)
, lock(std::move(lock_))
, marks_stream(
storage.disk->writeFile(storage.marks_file_path, 4096, WriteMode::Rewrite))
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
/// If there were no files, add info to rollback in case of error.
if (storage.file_checker.empty())
{
for (const auto & file : storage.files)
storage.file_checker.setEmpty(file.second.data_file_path);
storage.file_checker.save();
}
}
~LogBlockOutputStream() override
{
try
{
if (!done)
{
/// Rollback partial writes.
streams.clear();
storage.file_checker.repair();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
Block getHeader() const override { return metadata_snapshot->getSampleBlock(); }
void write(const Block & block) override;
void writeSuffix() override;
private:
StorageLog & storage;
StorageMetadataPtr metadata_snapshot;
std::unique_lock<std::shared_timed_mutex> lock;
bool done = false;
struct Stream
{
Stream(const DiskPtr & disk, const String & data_path, CompressionCodecPtr codec, size_t max_compress_block_size) :
plain(disk->writeFile(data_path, max_compress_block_size, WriteMode::Append)),
compressed(*plain, std::move(codec), max_compress_block_size),
plain_offset(disk->getFileSize(data_path))
{
}
std::unique_ptr<WriteBuffer> plain;
CompressedWriteBuffer compressed;
size_t plain_offset; /// How many bytes were in the file at the time the LogBlockOutputStream was created.
void finalize()
{
compressed.next();
plain->next();
}
};
using Mark = StorageLog::Mark;
using MarksForColumns = std::vector<std::pair<size_t, Mark>>;
using FileStreams = std::map<String, Stream>;
FileStreams streams;
using WrittenStreams = std::set<String>;
std::unique_ptr<WriteBuffer> marks_stream; /// Declared below `lock` to make the file open when rwlock is captured.
using SerializeState = IDataType::SerializeBinaryBulkStatePtr;
using SerializeStates = std::map<String, SerializeState>;
SerializeStates serialize_states;
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenStreams & written_streams);
void writeData(const String & name, const IDataType & type, const IColumn & column,
MarksForColumns & out_marks,
WrittenStreams & written_streams);
void writeMarks(MarksForColumns && marks);
};
void LogBlockOutputStream::write(const Block & block)
{
metadata_snapshot->check(block, true);
@ -477,10 +485,6 @@ StorageLog::StorageLog(
addFiles(column.name, *column.type);
marks_file_path = table_path + DBMS_STORAGE_LOG_MARKS_FILE_NAME;
if (!attach)
for (const auto & file : files)
file_checker.setEmpty(file.second.data_file_path);
}

View File

@ -98,6 +98,9 @@ public:
protected:
Chunk generate() override
{
if (storage.file_checker.empty())
return {};
Block res;
start();
@ -170,6 +173,13 @@ public:
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
if (storage.file_checker.empty())
{
storage.file_checker.setEmpty(storage.table_path + "data.bin");
storage.file_checker.setEmpty(storage.table_path + "index.mrk");
storage.file_checker.save();
}
}
~StripeLogBlockOutputStream() override
@ -264,9 +274,6 @@ StorageStripeLog::StorageStripeLog(
{
/// create directories if they do not exist
disk->createDirectories(table_path);
file_checker.setEmpty(table_path + "data.bin");
file_checker.setEmpty(table_path + "index.mrk");
}
else
{

View File

@ -121,7 +121,7 @@ Chunk TinyLogSource::generate()
{
Block res;
if (is_finished || (!streams.empty() && streams.begin()->second->compressed.eof()))
if (is_finished || file_sizes.empty() || (!streams.empty() && streams.begin()->second->compressed.eof()))
{
/** Close the files (before destroying the object).
* When many sources are created, but simultaneously reading only a few of them,
@ -196,6 +196,14 @@ public:
{
if (!lock)
throw Exception("Lock timeout exceeded", ErrorCodes::TIMEOUT_EXCEEDED);
/// If there were no files, add info to rollback in case of error.
if (storage.file_checker.empty())
{
for (const auto & file : storage.files)
storage.file_checker.setEmpty(file.second.data_file_path);
storage.file_checker.save();
}
}
~TinyLogBlockOutputStream() override
@ -393,10 +401,6 @@ StorageTinyLog::StorageTinyLog(
for (const auto & col : storage_metadata.getColumns().getAllPhysical())
addFiles(col.name, *col.type);
if (!attach)
for (const auto & file : files)
file_checker.setEmpty(file.second.data_file_path);
}