Fix some checks

This commit is contained in:
kssenii 2021-10-19 09:34:06 +03:00
parent d5d4817350
commit 49106f407f
4 changed files with 17 additions and 13 deletions

View File

@ -59,7 +59,11 @@ public:
clock_type = clock_type_;
}
void setReadUntilPosition(size_t position) override { file_in->setReadUntilPosition(position); }
void setReadUntilPosition(size_t position) override
{
if (file_in)
file_in->setReadUntilPosition(position);
}
};
}

View File

@ -28,24 +28,24 @@ namespace ErrorCodes
#if USE_AWS_S3
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path, size_t last_offset) const
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path, size_t offset) const
{
return std::make_unique<ReadBufferFromS3>(client_ptr, bucket,
fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, settings, threadpool_read, last_offset);
fs::path(metadata.remote_fs_root_path) / path, max_single_read_retries, settings, threadpool_read, offset);
}
#endif
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path, size_t last_offset) const
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path, size_t offset) const
{
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, threadpool_read, last_offset);
return std::make_unique<ReadBufferFromWebServer>(fs::path(uri) / path, context, settings, threadpool_read, offset);
}
#if USE_HDFS
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path, size_t last_offset) const
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path, size_t offset) const
{
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size, last_offset);
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size, offset);
}
#endif
@ -53,7 +53,7 @@ SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata & metadata_, const String & path_)
: ReadBuffer(nullptr, 0)
, metadata(metadata_)
, path(path_)
, canonical_path(path_)
{
}
@ -147,7 +147,7 @@ bool ReadBufferFromRemoteFSGather::readImpl()
if (bytes_to_ignore)
current_buf->ignore(bytes_to_ignore);
LOG_DEBUG(&Poco::Logger::get("Gather"), "Reading from path: {}", path);
LOG_DEBUG(&Poco::Logger::get("Gather"), "Reading from path: {}", canonical_path);
auto result = current_buf->next();
swap(*current_buf);
@ -182,7 +182,7 @@ void ReadBufferFromRemoteFSGather::reset()
String ReadBufferFromRemoteFSGather::getFileName() const
{
return path;
return canonical_path;
// if (current_buf)
// return fs::path(metadata.metadata_file_path) / metadata.remote_fs_objects[buf_idx].first;
// return metadata.metadata_file_path;

View File

@ -37,7 +37,7 @@ public:
size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
protected:
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t last_offset) const = 0;
virtual SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t offset) const = 0;
RemoteMetadata metadata;
@ -60,7 +60,7 @@ private:
size_t last_offset = 0;
String path;
String canonical_path;
};

View File

@ -48,7 +48,7 @@ MergeTreeReadPool::MergeTreeReadPool(
do_not_steal_tasks = true;
min_marks_for_concurrent_read = std::max(min_marks_for_concurrent_read, sum_marks_ / threads_);
}
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read_);
fillPerThreadInfo(threads_, sum_marks_, per_part_sum_marks, parts_ranges, min_marks_for_concurrent_read);
}