This commit is contained in:
feng lv 2021-10-13 05:23:00 +00:00
parent 0409b775bf
commit 74e1900034
3 changed files with 21 additions and 15 deletions

View File

@ -24,14 +24,14 @@ DirectoryWatcherBase::DirectoryWatcherBase(
: WithContext(context_->getGlobalContext()), owner(owner_), path(path_), event_mask(event_mask_)
{
if (!std::filesystem::exists(path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "The path {} does not exist.", path);
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "The path {} does not exist", path);
if (!std::filesystem::is_directory(path))
throw Exception(ErrorCodes::DIRECTORY_DOESNT_EXIST, "The path {} does not a directory.", path);
throw Exception(ErrorCodes::DIRECTORY_DOESNT_EXIST, "The path {} does not a directory", path);
fd = inotify_init();
if (fd == -1)
throw Exception("cannot initialize inotify", ErrorCodes::IO_SETUP_ERROR);
throw Exception("Cannot initialize inotify", ErrorCodes::IO_SETUP_ERROR);
watch_task = getContext()->getMessageBrokerSchedulePool().createTask("directory_watch", [this] { watchFunc(); });
start();
@ -54,7 +54,7 @@ void DirectoryWatcherBase::watchFunc()
int wd = inotify_add_watch(fd, path.c_str(), mask);
if (wd == -1)
{
owner.onError(Exception(ErrorCodes::IO_SETUP_ERROR, "Watch directory {} failed.", path));
owner.onError(Exception(ErrorCodes::IO_SETUP_ERROR, "Watch directory {} failed", path));
}
std::string buffer;
@ -62,11 +62,17 @@ void DirectoryWatcherBase::watchFunc()
fd_set fds;
while (!stopped)
{
#if defined(__clang__)
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wreserved-identifier"
FD_ZERO(&fds);
#pragma clang diagnostic pop
#else
FD_ZERO(&fds);
#endif
FD_SET(fd, &fds);
struct timeval tv;

View File

@ -107,7 +107,7 @@ void ReadBufferFromFileLog::readNewRecords(ReadBufferFromFileLog::Records & new_
auto & file_meta = StorageFileLog::findInMap(file_infos.meta_by_inode, file_ctx.inode);
if (!file_ctx.reader)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Ifstream for file {} does not initialized.", file_meta.file_name);
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Ifstream for file {} does not initialized", file_meta.file_name);
auto & reader = file_ctx.reader.value();
Record record;

View File

@ -270,7 +270,7 @@ void StorageFileLog::deserialize()
{
throw Exception(
ErrorCodes::NOT_REGULAR_FILE,
"The file {} under {} is not a regular file when deserializing meta files.",
"The file {} under {} is not a regular file when deserializing meta files",
dir_entry.path().c_str(),
root_meta_path);
}
@ -281,12 +281,12 @@ void StorageFileLog::deserialize()
if (!tryReadIntText(inode, in))
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed.", dir_entry.path().c_str());
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed", dir_entry.path().c_str());
}
assertChar('\n', in);
if (!tryReadIntText(last_written_pos, in))
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed.", dir_entry.path().c_str());
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed", dir_entry.path().c_str());
}
meta.file_name = dir_entry.path().filename();
@ -322,7 +322,7 @@ Pipe StorageFileLog::read(
{
throw Exception(
ErrorCodes::CANNOT_READ_ALL_DATA,
"Can not read from table {}, because it has been depended by other tables.",
"Can not read from table {}, because it has been depended by other tables",
table_id.getTableName());
}
@ -433,7 +433,7 @@ void StorageFileLog::assertStreamGood(const std::ifstream & reader)
{
if (!reader.good())
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Stream is in bad state.");
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Stream is in bad state");
}
}
@ -457,7 +457,7 @@ void StorageFileLog::openFilesAndSetPos()
auto & meta = findInMap(file_infos.meta_by_inode, file_ctx.inode);
if (meta.last_writen_position > static_cast<UInt64>(file_end))
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "File {} has been broken.", file);
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "File {} has been broken", file);
}
/// update file end at the monment, used in ReadBuffer and serialize
meta.last_open_end = file_end;
@ -516,16 +516,16 @@ void StorageFileLog::checkOffsetIsValid(const String & full_name, UInt64 offset)
if (!tryReadIntText(_, in))
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed.", full_name);
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed", full_name);
}
assertChar('\n', in);
if (!tryReadIntText(last_written_pos, in))
{
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed.", full_name);
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Read meta file {} failed", full_name);
}
if (last_written_pos > offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Last stored last_written_pos in meta file {} is bigger than current last_written_pos.", full_name);
ErrorCodes::LOGICAL_ERROR, "Last stored last_written_pos in meta file {} is bigger than current last_written_pos", full_name);
}
size_t StorageFileLog::getMaxBlockSize() const
@ -633,7 +633,7 @@ bool StorageFileLog::streamToViews()
auto table_id = getStorageID();
auto table = DatabaseCatalog::instance().getTable(table_id, getContext());
if (!table)
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist", ErrorCodes::LOGICAL_ERROR);
auto metadata_snapshot = getInMemoryMetadataPtr();
auto max_streams_number = std::min<UInt64>(filelog_settings->max_threads.value, file_infos.file_names.size());