fix

fix
This commit is contained in:
feng lv 2021-10-06 08:08:49 +00:00
parent f2a62ef9ee
commit acdb8233c3
5 changed files with 120 additions and 26 deletions

View File

@ -47,8 +47,8 @@ private:
const std::string path;
/// Note, in order to avoid data race found by fuzzer, put events before dw,
/// such that when this class desctruction, dw will be destructed before events.
/// The data race is because dw create a seperate thread to monitor file events
/// such that when this class destruction, dw will be destructed before events.
/// The data race is because dw create a separate thread to monitor file events
/// and put into events, then if we destruct events first, the monitor thread still
/// running, it may access events during events destruction, leads to data race.
Events events;

View File

@ -36,21 +36,17 @@ FileLogSource::FileLogSource(
metadata_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames(), storage.getVirtuals(), storage.getStorageID()))
{
buffer = std::make_unique<ReadBufferFromFileLog>(storage, max_block_size, poll_time_out, context, stream_number_, max_streams_number_);
const auto & file_infos = storage.getFileInfos();
size_t files_per_stream = file_infos.file_names.size() / max_streams_number;
start = stream_number * files_per_stream;
end = stream_number == max_streams_number - 1 ? file_infos.file_names.size() : (stream_number + 1) * files_per_stream;
}
void FileLogSource::onFinish()
{
auto & file_infos = storage.getFileInfos();
size_t files_per_stream = file_infos.file_names.size() / max_streams_number;
size_t start = stream_number * files_per_stream;
size_t end = stream_number == max_streams_number - 1 ? file_infos.file_names.size() : (stream_number + 1) * files_per_stream;
/// Each stream responsible for close it's files and store meta
for (size_t i = start; i < end; ++i)
{
storage.closeFileAndStoreMeta(file_infos.file_names[i]);
}
storage.closeFilesAndStoreMeta(start, end);
}
Chunk FileLogSource::generate()
@ -60,6 +56,7 @@ Chunk FileLogSource::generate()
/// There is no onFinish for ISource, we call it
/// when no records return to close files
onFinish();
finished = true;
return {};
}
@ -105,7 +102,11 @@ Chunk FileLogSource::generate()
}
if (total_rows == 0)
{
onFinish();
finished = true;
return {};
}
auto result_block = non_virtual_header.cloneWithColumns(executor.getResultColumns());
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
@ -121,6 +122,9 @@ Chunk FileLogSource::generate()
auto converting_actions = std::make_shared<ExpressionActions>(std::move(converting_dag));
converting_actions->execute(result_block);
/// After generate each block, store metas into disk
storage.storeMetas(start, end);
return Chunk(result_block.getColumns(), result_block.rows());
}

View File

@ -30,6 +30,12 @@ public:
void onFinish();
virtual ~FileLogSource() override
{
if (!finished)
onFinish();
}
protected:
Chunk generate() override;
@ -49,6 +55,11 @@ private:
Block non_virtual_header;
Block virtual_header;
/// The start pos and end pos of files responsible by this stream,
/// does not includind end
int start;
int end;
};
}

View File

@ -223,17 +223,26 @@ void StorageFileLog::serialize() const
{
std::filesystem::create_directories(root_meta_path);
}
for (const auto & it : file_infos.meta_by_inode)
for (const auto & [inode, meta] : file_infos.meta_by_inode)
{
auto full_name = getFullMetaPath(it.second.file_name);
auto full_name = getFullMetaPath(meta.file_name);
if (!std::filesystem::exists(full_name))
{
FS::createFile(full_name);
}
else
{
auto last_pos = getLastWrittenPos(meta.file_name);
if (last_pos > meta.last_writen_position)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Last stored last written pos is bigger than current last written pos need to store for meta file {}.",
full_name);
}
WriteBufferFromFile out(full_name);
writeIntText(it.first, out);
writeIntText(inode, out);
writeChar('\n', out);
writeIntText(it.second.last_writen_position, out);
writeIntText(meta.last_writen_position, out);
}
}
@ -248,6 +257,15 @@ void StorageFileLog::serialize(UInt64 inode, const FileMeta & file_meta) const
{
FS::createFile(full_name);
}
else
{
auto last_pos = getLastWrittenPos(file_meta.file_name);
if (last_pos > file_meta.last_writen_position)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Last stored last written pos is bigger than current last written pos need to store for meta file {}.",
full_name);
}
WriteBufferFromFile out(full_name);
writeIntText(inode, out);
writeChar('\n', out);
@ -288,6 +306,23 @@ void StorageFileLog::deserialize()
}
}
UInt64 StorageFileLog::getLastWrittenPos(const String & file_name) const
{
ReadBufferFromFile in(getFullMetaPath(file_name));
UInt64 _, last_written_pos;
if (!tryReadIntText(_, in))
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed.", getFullMetaPath(file_name));
}
assertChar('\n', in);
if (!tryReadIntText(last_written_pos, in))
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed.", getFullMetaPath(file_name));
}
return last_written_pos;
}
UInt64 StorageFileLog::getInode(const String & file_name)
{
struct stat file_stat;
@ -307,7 +342,19 @@ Pipe StorageFileLog::read(
size_t /* max_block_size */,
unsigned /* num_streams */)
{
/// We need this lock, in case read and streamToViews execute at the same time
auto table_id = getStorageID();
size_t dependencies_count = DatabaseCatalog::instance().getDependencies(table_id).size();
/// If there are MVs depended on this table, we just forbid reading
if (dependencies_count)
{
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA,
"Can not read from table {}, because it has been depended by other tables.",
table_id.getTableName());
}
/// We need this lock, in case read and streamToViews execute at the same time.
/// In case of MV attached during reading
std::lock_guard<std::mutex> lock(status_mutex);
updateFileInfos();
@ -440,14 +487,41 @@ void StorageFileLog::closeFilesAndStoreMeta()
serialize();
}
void StorageFileLog::closeFileAndStoreMeta(const String & file_name)
void StorageFileLog::closeFilesAndStoreMeta(int start, int end)
{
auto & file_ctx = findInMap(file_infos.context_by_name, file_name);
if (file_ctx.reader.is_open())
file_ctx.reader.close();
#ifndef NDEBUG
assert(start >= 0);
assert(start < end);
assert(end <= file_infos.file_names.size());
#endif
auto & meta = findInMap(file_infos.meta_by_inode, file_ctx.inode);
serialize(file_ctx.inode, meta);
for (int i = start; i < end; ++i)
{
auto & file_ctx = findInMap(file_infos.context_by_name, file_infos.file_names[i]);
if (file_ctx.reader.is_open())
file_ctx.reader.close();
auto & meta = findInMap(file_infos.meta_by_inode, file_ctx.inode);
serialize(file_ctx.inode, meta);
}
}
void StorageFileLog::storeMetas(int start, int end)
{
#ifndef NDEBUG
assert(start >= 0);
assert(start < end);
assert(end <= file_infos.file_names.size());
#endif
for (int i = start; i < end; ++i)
{
auto & file_ctx = findInMap(file_infos.context_by_name, file_infos.file_names[i]);
auto & meta = findInMap(file_infos.meta_by_inode, file_ctx.inode);
serialize(file_ctx.inode, meta);
}
}
size_t StorageFileLog::getMaxBlockSize() const

View File

@ -97,10 +97,14 @@ public:
static UInt64 getInode(const String & file_name);
void openFilesAndSetPos();
/// Used in shutdown()
void closeFilesAndStoreMeta();
/// Used in FileSource
void closeFileAndStoreMeta(const String & file_name);
/// Used in FileLogSource when finish generating all blocks
void closeFilesAndStoreMeta(int start, int end);
/// Used in FileLogSource after generating every block
void storeMetas(int start, int end);
static void assertStreamGood(const std::ifstream & reader);
@ -183,6 +187,7 @@ private:
void serialize(UInt64 inode, const FileMeta & file_meta) const;
void deserialize();
UInt64 getLastWrittenPos(const String & file_name) const;
};
}