mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #2946 from amosbird/locking
Better locking for StorageBuffer
This commit is contained in:
commit
046137f9c5
@ -277,7 +277,7 @@ public:
|
||||
|
||||
for (size_t try_no = 0; try_no < storage.num_shards; ++try_no)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(storage.buffers[shard_num].mutex, std::try_to_lock_t());
|
||||
std::unique_lock<std::mutex> lock(storage.buffers[shard_num].mutex, std::try_to_lock);
|
||||
|
||||
if (lock.owns_lock())
|
||||
{
|
||||
@ -295,14 +295,16 @@ public:
|
||||
|
||||
/// If you still can not lock anything at once, then we'll wait on mutex.
|
||||
if (!least_busy_buffer)
|
||||
insertIntoBuffer(block, storage.buffers[start_shard_num], std::unique_lock<std::mutex>(storage.buffers[start_shard_num].mutex));
|
||||
else
|
||||
insertIntoBuffer(block, *least_busy_buffer, std::move(least_busy_lock));
|
||||
{
|
||||
least_busy_buffer = &storage.buffers[start_shard_num];
|
||||
least_busy_lock = std::unique_lock<std::mutex>(least_busy_buffer->mutex);
|
||||
}
|
||||
insertIntoBuffer(block, *least_busy_buffer);
|
||||
}
|
||||
private:
|
||||
StorageBuffer & storage;
|
||||
|
||||
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, std::unique_lock<std::mutex> && lock)
|
||||
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer)
|
||||
{
|
||||
time_t current_time = time(nullptr);
|
||||
|
||||
@ -320,9 +322,7 @@ private:
|
||||
* an exception will be thrown, and new data will not be added to the buffer.
|
||||
*/
|
||||
|
||||
lock.unlock();
|
||||
storage.flushBuffer(buffer, true);
|
||||
lock.lock();
|
||||
storage.flushBuffer(buffer, true, true /* locked */);
|
||||
}
|
||||
|
||||
if (!buffer.first_write_time)
|
||||
@ -459,7 +459,7 @@ void StorageBuffer::flushAllBuffers(const bool check_thresholds)
|
||||
}
|
||||
|
||||
|
||||
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
|
||||
void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool locked)
|
||||
{
|
||||
Block block_to_write;
|
||||
time_t current_time = time(nullptr);
|
||||
@ -468,7 +468,9 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds)
|
||||
size_t bytes = 0;
|
||||
time_t time_passed = 0;
|
||||
|
||||
std::lock_guard<std::mutex> lock(buffer.mutex);
|
||||
std::unique_lock<std::mutex> lock(buffer.mutex, std::defer_lock);
|
||||
if (!locked)
|
||||
lock.lock();
|
||||
|
||||
block_to_write = buffer.data.cloneEmpty();
|
||||
|
||||
|
@ -114,7 +114,7 @@ private:
|
||||
|
||||
void flushAllBuffers(bool check_thresholds = true);
|
||||
/// Reset the buffer. If check_thresholds is set - resets only if thresholds are exceeded.
|
||||
void flushBuffer(Buffer & buffer, bool check_thresholds);
|
||||
void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false);
|
||||
bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
|
||||
bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user