Test changes in MergeTreeReader for local fs

This commit is contained in:
kssenii 2021-10-20 12:00:24 +03:00
parent 31cd71c849
commit 5f24eb10ec
4 changed files with 52 additions and 4 deletions

View File

@ -26,6 +26,7 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int ARGUMENT_OUT_OF_BOUND; extern const int ARGUMENT_OUT_OF_BOUND;
extern const int LOGICAL_ERROR;
} }
@ -53,14 +54,52 @@ void AsynchronousReadBufferFromFileDescriptor::prefetch()
if (prefetch_future.valid()) if (prefetch_future.valid())
return; return;
/// Will request the same amount of data that is read in nextImpl. size_t read_size;
prefetch_buffer.resize(internal_buffer.size()); if (read_until_position)
{
/// Everything is already read.
if (file_offset_of_buffer_end == *read_until_position)
return;
if (file_offset_of_buffer_end > *read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
file_offset_of_buffer_end, *read_until_position);
/// Read range [file_offset_of_buffer_end, read_until_position).
read_size = *read_until_position - file_offset_of_buffer_end - 1;
}
else
{
read_size = internal_buffer.size();
}
prefetch_buffer.resize(read_size);
prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size()); prefetch_future = readInto(prefetch_buffer.data(), prefetch_buffer.size());
} }
void AsynchronousReadBufferFromFileDescriptor::setReadUntilPosition(size_t position)
{
if (prefetch_future.valid())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Prefetch is valid in readUntilPosition");
read_until_position = position;
}
bool AsynchronousReadBufferFromFileDescriptor::nextImpl() bool AsynchronousReadBufferFromFileDescriptor::nextImpl()
{ {
if (read_until_position)
{
/// Everything is already read.
if (file_offset_of_buffer_end == *read_until_position)
return false;
if (file_offset_of_buffer_end > *read_until_position)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Read beyond last offset ({} > {})",
file_offset_of_buffer_end, *read_until_position);
}
if (prefetch_future.valid()) if (prefetch_future.valid())
{ {
/// Read request already in flight. Wait for its completion. /// Read request already in flight. Wait for its completion.
@ -201,4 +240,3 @@ void AsynchronousReadBufferFromFileDescriptor::rewind()
} }
} }

View File

@ -24,6 +24,7 @@ protected:
const size_t required_alignment = 0; /// For O_DIRECT both file offsets and memory addresses have to be aligned. const size_t required_alignment = 0; /// For O_DIRECT both file offsets and memory addresses have to be aligned.
size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end(). size_t file_offset_of_buffer_end = 0; /// What offset in file corresponds to working_buffer.end().
std::optional<size_t> read_until_position;
int fd; int fd;
bool nextImpl() override; bool nextImpl() override;
@ -33,6 +34,8 @@ protected:
void finalize(); void finalize();
void setReadUntilPosition(size_t position) override;
public: public:
AsynchronousReadBufferFromFileDescriptor( AsynchronousReadBufferFromFileDescriptor(
AsynchronousReaderPtr reader_, Int32 priority_, AsynchronousReaderPtr reader_, Int32 priority_,
@ -67,4 +70,3 @@ private:
}; };
} }

View File

@ -49,6 +49,7 @@ ln -sf $SRC_PATH/users.d/remote_queries.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/session_log_test.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/session_log_test.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/memory_profiler.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/memory_profiler.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/no_fsync_metadata.xml $DEST_SERVER_PATH/users.d/ ln -sf $SRC_PATH/users.d/no_fsync_metadata.xml $DEST_SERVER_PATH/users.d/
ln -sf $SRC_PATH/users.d/read_method.xml $DEST_SERVER_PATH/users.d/
# FIXME DataPartsExchange may hang for http_send_timeout seconds # FIXME DataPartsExchange may hang for http_send_timeout seconds
# when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"), # when nobody is going to read from the other side of socket (due to "Fetching of part was cancelled"),

View File

@ -0,0 +1,7 @@
<clickhouse>
<profiles>
<default>
<local_filesystem_read_method>pread_threadpool</local_filesystem_read_method>
</default>
</profiles>
</clickhouse>