mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
fix
fix test fix fix fix exception handle
This commit is contained in:
parent
f637f4f2d1
commit
9456c67486
@ -10,6 +10,10 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_READ_FROM_ISTREAM;
|
||||
}
|
||||
|
||||
ReadBufferFromFileLog::ReadBufferFromFileLog(
|
||||
StorageFileLog & storage_,
|
||||
@ -97,7 +101,7 @@ void ReadBufferFromFileLog::readNewRecords(ReadBufferFromFileLog::Records & new_
|
||||
size_t read_records_size = 0;
|
||||
|
||||
const auto & file_names = storage.getFileNames();
|
||||
auto & file_status = storage.getFileStatuses();
|
||||
auto & file_statuses = storage.getFileStatuses();
|
||||
|
||||
size_t files_per_stream = file_names.size() / max_streams_number;
|
||||
size_t start = stream_number * files_per_stream;
|
||||
@ -105,40 +109,75 @@ void ReadBufferFromFileLog::readNewRecords(ReadBufferFromFileLog::Records & new_
|
||||
|
||||
for (size_t i = start; i < end; ++i)
|
||||
{
|
||||
auto & file = file_status[file_names[i]];
|
||||
auto & file = file_statuses[file_names[i]];
|
||||
if (file.status == StorageFileLog::FileStatus::NO_CHANGE)
|
||||
continue;
|
||||
|
||||
auto reader = std::ifstream(file_names[i]);
|
||||
|
||||
/// check if ifstream is good. For example, if the file deleted during streamToViews,
|
||||
/// this will return false because file does not exist anymore.
|
||||
if (!reader.good())
|
||||
{
|
||||
throw Exception("Can not read from file " + file_names[i] + ", stream broken.", ErrorCodes::CANNOT_READ_FROM_ISTREAM);
|
||||
}
|
||||
|
||||
reader.seekg(0, reader.end);
|
||||
assert(reader.good());
|
||||
/// Exception may happen in seekg and tellg, then badbit will be set
|
||||
if (!reader.good())
|
||||
{
|
||||
throw Exception("Can not read from file " + file_names[i] + ", stream broken.", ErrorCodes::CANNOT_READ_FROM_ISTREAM);
|
||||
}
|
||||
|
||||
auto stream_end = reader.tellg();
|
||||
assert(reader.good());
|
||||
if (!reader.good())
|
||||
{
|
||||
throw Exception("Can not read from file " + file_names[i] + ", stream broken.", ErrorCodes::CANNOT_READ_FROM_ISTREAM);
|
||||
}
|
||||
|
||||
/// file may broken(for example truncate), mark this file to BROKEN,
|
||||
/// should be removed in next updateFileStatuses call
|
||||
if (file.last_read_position > static_cast<size_t>(stream_end))
|
||||
{
|
||||
throw Exception("Can not read from file " + file_names[i] + ", stream broken.", ErrorCodes::CANNOT_READ_FROM_ISTREAM);
|
||||
}
|
||||
|
||||
reader.seekg(file.last_read_position);
|
||||
assert(reader.good());
|
||||
if (!reader.good())
|
||||
{
|
||||
throw Exception("Can not read from file " + file_names[i] + ", stream broken.", ErrorCodes::CANNOT_READ_FROM_ISTREAM);
|
||||
}
|
||||
|
||||
Record record;
|
||||
while (read_records_size < need_records_size && reader.tellg() < stream_end)
|
||||
{
|
||||
if (!reader.good())
|
||||
{
|
||||
throw Exception("Can not read from file " + file_names[i] + ", stream broken.", ErrorCodes::CANNOT_READ_FROM_ISTREAM);
|
||||
}
|
||||
std::getline(reader, record);
|
||||
new_records.emplace_back(record);
|
||||
++read_records_size;
|
||||
}
|
||||
|
||||
file.last_read_position = reader.tellg();
|
||||
assert(reader.good());
|
||||
auto current_position = reader.tellg();
|
||||
if (!reader.good())
|
||||
{
|
||||
throw Exception("Can not read from file " + file_names[i] + ", stream broken.", ErrorCodes::CANNOT_READ_FROM_ISTREAM);
|
||||
}
|
||||
|
||||
if (reader.tellg() == stream_end)
|
||||
file.last_read_position = current_position;
|
||||
|
||||
/// stream reach to end
|
||||
if (current_position == stream_end && file.status != StorageFileLog::FileStatus::BROKEN)
|
||||
{
|
||||
file.status = StorageFileLog::FileStatus::NO_CHANGE;
|
||||
/// All ifstream reach end
|
||||
if (i == end - 1)
|
||||
{
|
||||
stream_out = true;
|
||||
}
|
||||
}
|
||||
|
||||
/// All ifstream reach end or broken
|
||||
if (i == end - 1 && (file.status == StorageFileLog::FileStatus::NO_CHANGE || file.status == StorageFileLog::FileStatus::BEGIN))
|
||||
{
|
||||
stream_out = true;
|
||||
}
|
||||
|
||||
if (read_records_size == need_records_size)
|
||||
|
@ -432,7 +432,7 @@ bool StorageFileLog::updateFileStatuses()
|
||||
std::vector<String> valid_files;
|
||||
for (const auto & file_name : file_names)
|
||||
{
|
||||
if (file_statuses.at(file_name).status == FileStatus::REMOVED)
|
||||
if (file_statuses.at(file_name).status == FileStatus::REMOVED || file_statuses.at(file_name).status == FileStatus::BROKEN)
|
||||
{
|
||||
file_statuses.erase(file_name);
|
||||
}
|
||||
|
@ -28,7 +28,8 @@ public:
|
||||
BEGIN,
|
||||
NO_CHANGE,
|
||||
UPDATED,
|
||||
REMOVED
|
||||
REMOVED,
|
||||
BROKEN
|
||||
};
|
||||
|
||||
using Files = std::vector<String>;
|
||||
|
@ -1,4 +1,6 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: long, no-parallel
|
||||
|
||||
set -eu
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
@ -49,7 +51,7 @@ rm ${user_files_path}/logs/e.txt
|
||||
|
||||
# the max value of reschedule time is 32s, so 40s should
|
||||
# always be enough to finish streamToViews
|
||||
for _ in {1..400}
|
||||
for _ in {1..150}
|
||||
do
|
||||
sleep 0.1
|
||||
done
|
||||
@ -57,10 +59,11 @@ done
|
||||
${CLICKHOUSE_CLIENT} --query "select * from mv order by k;"
|
||||
|
||||
echo 111, 111 >> ${user_files_path}/logs/c.txt
|
||||
truncate ${user_files_path}/logs/d.txt --size 0
|
||||
|
||||
# the max value of reschedule time is 32s, so 40s should
|
||||
# always be enough to finish streamToViews
|
||||
for _ in {1..400}
|
||||
for _ in {1..150}
|
||||
do
|
||||
sleep 0.1
|
||||
done
|
||||
|
Loading…
Reference in New Issue
Block a user