Better [#METR-12588].

This commit is contained in:
Alexey Milovidov 2016-04-13 07:15:30 +03:00
parent edc1bb39c7
commit 49cf0f148c
2 changed files with 34 additions and 111 deletions

View File

@ -1,6 +1,7 @@
#pragma once
#include <queue>
#include <boost/intrusive_ptr.hpp>
#include <common/logger_useful.h>
@ -19,6 +20,37 @@ namespace ErrorCodes
}
/// Позволяет ссылаться на строку в блоке и удерживать владение блоком,
/// и таким образом избежать создания временного объекта-строки.
/// Не используется std::shared_ptr, так как не нужно место для weak_count и deleter;
/// не используется Poco::SharedPtr, так как нужно выделять блок и refcount одним куском;
/// не используется Poco::AutoPtr, так как у него нет move конструктора и есть лишние проверки на nullptr;
/// Счётчик ссылок неатомарный, так как используется из одного потока.
namespace detail
{
struct SharedBlock : Block
{
int refcount = 0;
SharedBlock(Block && value_)
: Block(std::move(value_)) {};
};
}
using SharedBlockPtr = boost::intrusive_ptr<detail::SharedBlock>;
inline void intrusive_ptr_add_ref(detail::SharedBlock * ptr)
{
++ptr->refcount;
}
inline void intrusive_ptr_release(detail::SharedBlock * ptr)
{
if (0 == --ptr->refcount)
delete ptr;
}
/** Соединяет несколько сортированных потоков в один.
*/
class MergingSortedBlockInputStream : public IProfilingBlockInputStream
@ -57,115 +89,6 @@ public:
}
protected:
/// Позволяет ссылаться на строку в блоке и удерживать владение блоком,
/// и таким образом избежать создания временного объекта-строки.
/// Не используется std::shared_ptr, так как не нужно место для weak_count и deleter;
/// не используется Poco::SharedPtr, так как нужно выделять блок и refcount одним куском;
/// не используется Poco::AutoPtr, так как у него нет move конструктора и есть лишние проверси на nullptr;
/// всё таки можно было бы использовать boost::intrusive_ptr.
/// Счётчик ссылок неатомарный, так как используется из одного потока.
class SharedBlockPtr
{
private:
struct SharedBlock
{
Block value;
int refcount = 1;
SharedBlock(Block && value_)
: value(std::move(value_)) {};
};
SharedBlock * ptr = nullptr;
public:
SharedBlockPtr() {}
SharedBlockPtr & operator= (Block && value_)
{
if (ptr)
release();
ptr = new SharedBlock(std::move(value_));
return *this;
}
SharedBlockPtr(Block && value_)
{
*this = std::move(value_);
}
SharedBlockPtr & operator= (const SharedBlockPtr & other)
{
if (this == &other)
return *this;
if (ptr)
release();
ptr = const_cast<SharedBlockPtr &>(other).ptr;
if (ptr)
++ptr->refcount;
return *this;
}
SharedBlockPtr(const SharedBlockPtr & other)
{
*this = other;
}
SharedBlockPtr & operator= (SharedBlockPtr && other)
{
if (ptr)
release();
ptr = other.ptr;
other.ptr = nullptr;
return *this;
}
void swap(SharedBlockPtr & other)
{
std::swap(ptr, other.ptr);
}
~SharedBlockPtr()
{
if (ptr)
release();
}
bool operator==(const SharedBlockPtr & other) const
{
return ptr == other.ptr;
}
bool operator!=(const SharedBlockPtr & other) const
{
return ptr != other.ptr;
}
Block * get() { return &ptr->value; }
const Block * get() const { return &ptr->value; }
Block * operator->() { return get(); }
const Block * operator->() const { return get(); }
Block & operator*() { return *get(); }
const Block & operator*() const { return *get(); }
private:
void release()
{
if (--ptr->refcount == 0)
delete ptr;
}
};
struct RowRef
{
ConstColumnPlainPtrs columns;

View File

@ -34,7 +34,7 @@ void MergingSortedBlockInputStream::init(Block & merged_block, ColumnPlainPtrs &
if (shared_block_ptr.get())
continue;
shared_block_ptr = children[i]->read();
shared_block_ptr = new detail::SharedBlock(children[i]->read());
if (shared_block_ptr->rowsInFirstColumn() == 0)
continue;
@ -276,7 +276,7 @@ void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current,
{
if (&cursors[i] == current.impl)
{
source_blocks[i] = children[i]->read();
source_blocks[i] = new detail::SharedBlock(children[i]->read());
if (*source_blocks[i])
{
cursors[i].reset(*source_blocks[i]);