This commit is contained in:
Antonio Andelic 2022-03-09 09:25:07 +00:00
parent 7b526b25a8
commit df0f5e20d0
4 changed files with 113 additions and 30 deletions

View File

@ -49,7 +49,7 @@ private:
Block * free_lists[16] {};
public:
ArenaWithFreeLists(
explicit ArenaWithFreeLists(
const size_t initial_size = 4096, const size_t growth_factor = 2,
const size_t linear_growth_threshold = 128 * 1024 * 1024)
: pool{initial_size, growth_factor, linear_growth_threshold}
@ -74,7 +74,7 @@ public:
ASAN_UNPOISON_MEMORY_REGION(free_block_ptr,
std::max(size, sizeof(Block)));
const auto res = free_block_ptr->data;
auto * res = free_block_ptr->data;
free_block_ptr = free_block_ptr->next;
return res;
}
@ -93,7 +93,7 @@ public:
/// Insert the released block into the head of the list.
auto & free_block_ptr = free_lists[list_idx];
const auto old_head = free_block_ptr;
auto * old_head = free_block_ptr;
free_block_ptr = reinterpret_cast<Block *>(ptr);
free_block_ptr->next = old_head;
@ -113,5 +113,35 @@ public:
}
};
class SynchronizedArenaWithFreeLists : private ArenaWithFreeLists
{
public:
explicit SynchronizedArenaWithFreeLists(
const size_t initial_size = 4096, const size_t growth_factor = 2,
const size_t linear_growth_threshold = 128 * 1024 * 1024)
: ArenaWithFreeLists{initial_size, growth_factor, linear_growth_threshold}
{}
char * alloc(const size_t size)
{
std::lock_guard lock{mutex};
return ArenaWithFreeLists::alloc(size);
}
void free(char * ptr, const size_t size)
{
std::lock_guard lock{mutex};
return ArenaWithFreeLists::free(ptr, size);
}
/// Size of the allocated pool in bytes
size_t size() const
{
std::lock_guard lock{mutex};
return ArenaWithFreeLists::size();
}
private:
mutable std::mutex mutex;
};
}

View File

@ -33,7 +33,7 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
if (offset < 0)
throw Exception("Seek position is out of bounds. Offset: " + std::to_string(offset), ErrorCodes::SEEK_POSITION_OUT_OF_BOUND);
if (!working_buffer.empty() && size_t(offset) >= current_position - working_buffer.size() && offset < current_position)
if (!working_buffer.empty() && static_cast<size_t>(offset) >= current_position - working_buffer.size() && offset < current_position)
{
pos = working_buffer.end() - (current_position - offset);
assert(pos >= working_buffer.begin());
@ -47,11 +47,16 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
= [&](const auto & range) { return static_cast<size_t>(offset) >= range.from && static_cast<size_t>(offset) < range.to; };
std::unique_lock lock{mutex};
bool worker_removed = false;
while (!read_workers.empty() && (offset < current_position || !offset_is_in_range(read_workers.front()->range)))
{
read_workers.pop_front();
worker_removed = true;
}
if (worker_removed)
reader_condvar.notify_all();
if (!read_workers.empty())
{
auto & front_worker = read_workers.front();
@ -61,21 +66,17 @@ off_t ParallelReadBuffer::seek(off_t offset, int whence)
{
next_condvar.wait(lock, [&] { return !segments.empty(); });
if (static_cast<size_t>(offset) < current_position + segments.front().size())
auto next_segment = front_worker->nextSegment();
if (static_cast<size_t>(offset) < current_position + next_segment.size())
{
arena.free(segment->data(), segment->size());
segment = std::move(segments.front());
segments.pop_front();
working_buffer = internal_buffer = Buffer(segment->data(), segment->data() + segment->size());
current_position += segment->size();
front_worker->range.from += segment->size();
current_segment = std::move(next_segment);
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
current_position += current_segment.size();
pos = working_buffer.end() - (current_position - offset);
return offset;
}
current_position += segments.front().size();
front_worker->range.from += segments.front().size();
segments.pop_front();
current_position += next_segment.size();
}
}
@ -130,9 +131,16 @@ bool ParallelReadBuffer::nextImpl()
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Emergency stop");
}
bool worker_removed = false;
/// Remove completed units
while (!read_workers.empty() && currentWorkerCompleted())
{
read_workers.pop_front();
worker_removed = true;
}
if (worker_removed)
reader_condvar.notify_all();
/// All readers processed, stop
if (read_workers.empty() && all_created)
@ -145,17 +153,11 @@ bool ParallelReadBuffer::nextImpl()
/// Read data from first segment of the first reader
if (!front_worker->segments.empty())
{
if (segment)
{
arena.free(segment->data(), segment->size());
}
segment = std::move(front_worker->segments.front());
front_worker->range.from += segment->size();
front_worker->segments.pop_front();
current_segment = front_worker->nextSegment();
break;
}
}
working_buffer = internal_buffer = Buffer(segment->data(), segment->data() + segment->size());
working_buffer = internal_buffer = Buffer(current_segment.data(), current_segment.data() + current_segment.size());
current_position += working_buffer.size();
return true;
}
@ -168,7 +170,8 @@ void ParallelReadBuffer::processor()
{
/// Create new read worker and put in into end of queue
/// reader_factory is not thread safe, so we call getReader under lock
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);
reader_condvar.wait(lock, [this] { return read_workers.size() < pool.getMaxThreads(); });
auto reader = reader_factory->getReader();
if (!reader)
{
@ -203,7 +206,7 @@ void ParallelReadBuffer::readerThreadFunction(ReadWorkerPtr read_worker)
break;
Buffer buffer = read_worker->reader->buffer();
std::span new_segment(arena.alloc(buffer.size()), buffer.size());
Segment new_segment(buffer.size(), &arena);
memcpy(new_segment.data(), buffer.begin(), buffer.size());
{
/// New data ready to be read

View File

@ -6,8 +6,6 @@
#include "Common/ArenaWithFreeLists.h"
#include <Common/ThreadPool.h>
#include <span>
namespace DB
{
@ -32,6 +30,47 @@ private:
void initializeWorkers();
class Segment
{
public:
Segment(size_t size_, SynchronizedArenaWithFreeLists * arena_) : arena(arena_), m_data(arena->alloc(size_)), m_size(size_) { }
Segment() = default;
Segment(const Segment &) = delete;
Segment & operator=(const Segment &) = delete;
Segment(Segment && other) noexcept : arena(other.arena)
{
std::swap(m_data, other.m_data);
std::swap(m_size, other.m_size);
}
Segment & operator=(Segment && other) noexcept
{
arena = other.arena;
std::swap(m_data, other.m_data);
std::swap(m_size, other.m_size);
return *this;
}
~Segment()
{
if (m_data)
{
arena->free(m_data, m_size);
}
}
auto data() const noexcept { return m_data; }
auto size() const noexcept { return m_size; }
private:
SynchronizedArenaWithFreeLists * arena{nullptr};
char * m_data{nullptr};
size_t m_size{0};
};
public:
struct Range
{
@ -64,8 +103,17 @@ private:
{
explicit ReadWorker(ReadBufferPtr reader_, const Range & range_) : reader(reader_), range(range_) { }
Segment nextSegment()
{
assert(!segments.empty());
auto next_segment = std::move(segments.front());
segments.pop_front();
range.from += next_segment.size();
return next_segment;
}
ReadBufferPtr reader;
std::deque<std::span<char>> segments;
std::deque<Segment> segments;
bool finished{false};
Range range;
};
@ -93,8 +141,9 @@ private:
void onBackgroundException();
void finishAndWait();
ArenaWithFreeLists arena;
std::optional<std::span<char>> segment;
SynchronizedArenaWithFreeLists arena;
Segment current_segment;
ThreadPool pool;
@ -107,6 +156,7 @@ private:
* deque and data from next reader will be consumed to user.
*/
std::deque<ReadWorkerPtr> read_workers;
std::condition_variable reader_condvar;
std::mutex mutex;
/// Triggered when new data available

View File

@ -306,7 +306,7 @@ namespace
/* use_external_buffer */ false,
/* skip_url_not_found_error */ skip_url_not_found_error);
return wrapReadBufferWithCompressionMethod(
std::make_unique<ParallelReadBuffer>(std::move(read_buffer_factory), 4),
std::make_unique<ParallelReadBuffer>(std::move(read_buffer_factory), 8),
chooseCompressionMethod(request_uri.getPath(), compression_method));
}