Get rid of Poco::SharedPtr [#METR-21503].

This commit is contained in:
Alexey Milovidov 2016-05-28 20:31:50 +03:00
parent 7ab7917d12
commit c518abe882
25 changed files with 74 additions and 87 deletions

View File

@ -12,8 +12,6 @@
#include <DB/IO/WriteBuffer.h>
#include <DB/IO/WriteHelpers.h>
#include <Poco/SharedPtr.h>
namespace DB
{
@ -49,71 +47,56 @@ namespace ErrorCodes
* определяющий, какие отдельные значения надо уничтожать, а какие - нет.
* Ясно, что этот метод имел бы существенно ненулевую цену.
*/
class ColumnAggregateFunction final : public IColumn
class ColumnAggregateFunction final : public IColumn, public std::enable_shared_from_this<ColumnAggregateFunction>
{
public:
using Container_t = PaddedPODArray<AggregateDataPtr>;
private:
Arenas arenas; /// Пулы, в которых выделены состояния агрегатных функций.
/// Пулы, в которых выделены состояния агрегатных функций.
Arenas arenas;
struct Holder
{
using Ptr = Poco::SharedPtr<Holder>;
/// Используется для уничтожения состояний и для финализации значений.
AggregateFunctionPtr func;
AggregateFunctionPtr func; /// Используется для уничтожения состояний и для финализации значений.
const Ptr src; /// Источник. Используется, если данный столбец создан из другого и использует все или часть его значений.
Container_t data; /// Массив указателей на состояния агрегатных функций, расположенных в пулах.
/// Источник. Используется (удерживает источник от уничтожения),
/// если данный столбец создан из другого и использует все или часть его значений.
const std::shared_ptr<const ColumnAggregateFunction> src;
Holder(const AggregateFunctionPtr & func_) : func(func_) {}
Holder(const Ptr & src_) : func(src_->func), src(src_) {}
~Holder()
{
if (!func->hasTrivialDestructor() && src.isNull())
for (auto val : data)
func->destroy(val);
}
void popBack(size_t n)
{
size_t size = data.size();
size_t new_size = size - n;
if (src.isNull())
for (size_t i = new_size; i < size; ++i)
func->destroy(data[i]);
data.resize_assume_reserved(new_size);
}
};
Holder::Ptr holder; /// NOTE Вместо этого можно было бы унаследовать IColumn от enable_shared_from_this.
/// Массив указателей на состояния агрегатных функций, расположенных в пулах.
Container_t data;
public:
/// Создать столбец на основе другого.
ColumnAggregateFunction(const ColumnAggregateFunction & src)
: arenas(src.arenas), holder(new Holder(src.holder))
ColumnAggregateFunction(const ColumnAggregateFunction & other)
: arenas(other.arenas), func(other.func), src(other.shared_from_this())
{
}
ColumnAggregateFunction(const AggregateFunctionPtr & func_)
: holder(new Holder(func_))
: func(func_)
{
}
ColumnAggregateFunction(const AggregateFunctionPtr & func_, const Arenas & arenas_)
: arenas(arenas_), holder(new Holder(func_))
: arenas(arenas_), func(func_)
{
}
~ColumnAggregateFunction()
{
if (!func->hasTrivialDestructor() && src)
for (auto val : data)
func->destroy(val);
}
void set(const AggregateFunctionPtr & func_)
{
holder->func = func_;
func = func_;
}
AggregateFunctionPtr getAggregateFunction() { return holder->func; }
AggregateFunctionPtr getAggregateFunction() const { return holder->func; }
AggregateFunctionPtr getAggregateFunction() { return func; }
AggregateFunctionPtr getAggregateFunction() const { return func; }
/// Захватить владение ареной.
void addArena(ArenaPtr arena_)
@ -136,7 +119,7 @@ public:
ColumnPtr cloneEmpty() const override
{
return std::make_shared<ColumnAggregateFunction>(holder->func, Arenas(1, std::make_shared<Arena>()));
return std::make_shared<ColumnAggregateFunction>(func, Arenas(1, std::make_shared<Arena>()));
};
Field operator[](size_t n) const override
@ -144,7 +127,7 @@ public:
Field field = String();
{
WriteBufferFromString buffer(field.get<String &>());
holder.get()->func->serialize(getData()[n], buffer);
func->serialize(getData()[n], buffer);
}
return field;
}
@ -154,7 +137,7 @@ public:
res = String();
{
WriteBufferFromString buffer(res.get<String &>());
holder.get()->func->serialize(getData()[n], buffer);
func->serialize(getData()[n], buffer);
}
}
@ -176,7 +159,7 @@ public:
/// Объединить состояние в последней строке с заданным
void insertMergeFrom(const IColumn & src, size_t n)
{
holder.get()->func.get()->merge(getData().back(), static_cast<const ColumnAggregateFunction &>(src).getData()[n]);
func->merge(getData().back(), static_cast<const ColumnAggregateFunction &>(src).getData()[n]);
}
Arena & createOrGetArena()
@ -188,7 +171,7 @@ public:
void insert(const Field & x) override
{
IAggregateFunction * function = holder.get()->func.get();
IAggregateFunction * function = func.get();
Arena & arena = createOrGetArena();
@ -200,7 +183,7 @@ public:
void insertDefault() override
{
IAggregateFunction * function = holder.get()->func.get();
IAggregateFunction * function = func.get();
Arena & arena = createOrGetArena();
@ -247,7 +230,14 @@ public:
void popBack(size_t n) override
{
holder.get()->popBack(n);
size_t size = data.size();
size_t new_size = size - n;
if (!src)
for (size_t i = new_size; i < size; ++i)
func->destroy(data[i]);
data.resize_assume_reserved(new_size);
}
ColumnPtr filter(const Filter & filter, ssize_t result_size_hint) const override
@ -324,12 +314,12 @@ public:
/** Более эффективные методы манипуляции */
Container_t & getData()
{
return holder.get()->data;
return data;
}
const Container_t & getData() const
{
return holder.get()->data;
return data;
}
};

View File

@ -11,7 +11,7 @@ class Context;
/** Позволяет получить функцию по имени.
* Функция при создании также может использовать для инициализации (например, захватить SharedPtr)
* Функция при создании также может использовать для инициализации (например, захватить shared_ptr)
* какие-нибудь справочники, находящиеся в Context-е.
*/
class FunctionFactory : public Singleton<FunctionFactory>

View File

@ -36,7 +36,7 @@ private:
StorageReplicatedMergeTree & storage;
Logger * log;
zkutil::EventPtr wakeup_event { new Poco::Event };
zkutil::EventPtr wakeup_event { std::make_shared<Poco::Event>() };
volatile bool need_stop { false };
std::thread thread;

View File

@ -387,7 +387,7 @@ private:
Context & context;
Logger * log = &Logger::get("ReshardingWorker");
zkutil::EventPtr event = new Poco::Event;
zkutil::EventPtr event = std::make_shared<Poco::Event>();
zkutil::GetZooKeeper get_zookeeper;
std::atomic<bool> is_started{false};

View File

@ -275,7 +275,7 @@ private:
/// Поток, следящий за обновлениями в логах всех реплик и загружающий их в очередь.
std::thread queue_updating_thread;
zkutil::EventPtr queue_updating_event = zkutil::EventPtr(new Poco::Event);
zkutil::EventPtr queue_updating_event = std::make_shared<Poco::Event>();
/// Задание, выполняющее действия из очереди.
BackgroundProcessingPool::TaskHandle queue_task_handle;
@ -298,7 +298,7 @@ private:
ReplicatedMergeTreePartCheckThread part_check_thread;
/// Событие, пробуждающее метод alter от ожидания завершения запроса ALTER.
zkutil::EventPtr alter_query_event = zkutil::EventPtr(new Poco::Event);
zkutil::EventPtr alter_query_event = std::make_shared<Poco::Event>();
Logger * log;

View File

@ -7,7 +7,7 @@ namespace DB
ColumnPtr ColumnAggregateFunction::convertToValues() const
{
const IAggregateFunction * function = holder->func.get();
const IAggregateFunction * function = func.get();
ColumnPtr res = function->getReturnType()->createColumn();
/** Если агрегатная функция возвращает нефинализированное состояние,

View File

@ -2217,7 +2217,7 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
dst.insert({src_col.column->filter(filter, size_hint), src_col.type, src_col.name});
/** Вставленные в блок столбцы типа ColumnAggregateFunction будут владеть состояниями агрегатных функций
* путём удержания SharedPtr-а на исходный столбец. См. ColumnAggregateFunction.h
* путём удержания shared_ptr-а на исходный столбец. См. ColumnAggregateFunction.h
*/
}
}

View File

@ -313,7 +313,7 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
{
while (true)
{
zkutil::EventPtr event = new Poco::Event;
zkutil::EventPtr event = std::make_shared<Poco::Event>();
std::string value;
/// get вместо exists, чтобы не утек watch, если ноды уже нет.

View File

@ -193,7 +193,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
activateReplica();
updateQuorumIfWeHavePart();
storage.leader_election = new zkutil::LeaderElection(
storage.leader_election = std::make_shared<zkutil::LeaderElection>(
storage.zookeeper_path + "/leader_election",
*storage.current_zookeeper, /// current_zookeeper живёт в течение времени жизни leader_election,
/// так как до изменения current_zookeeper, объект leader_election уничтожается в методе partialShutdown.

View File

@ -2637,7 +2637,7 @@ void ReshardingWorker::AnomalyMonitor::routine()
/// We create a new instance of Poco::Event each time we run
/// the loop body in order to avoid multiple notifications.
zkutil::EventPtr event = new Poco::Event;
zkutil::EventPtr event = std::make_shared<Poco::Event>();
/// Monitor both status changes and movements of the performers.
(void) zookeeper->get(resharding_worker.getCoordinatorPath(coordinator_id)

View File

@ -697,7 +697,7 @@ void StorageReplicatedMergeTree::createReplica()
{
LOG_INFO(log, "Waiting for replica " << source_path << " to be fully created");
zkutil::EventPtr event = new Poco::Event;
zkutil::EventPtr event = std::make_shared<Poco::Event>();
if (zookeeper->exists(source_path + "/columns", nullptr, event))
{
LOG_WARNING(log, "Oops, a watch has leaked");
@ -2875,7 +2875,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
/// Дождемся, пока запись попадет в очередь реплики.
while (true)
{
zkutil::EventPtr event = new Poco::Event;
zkutil::EventPtr event = std::make_shared<Poco::Event>();
String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)
@ -2920,7 +2920,7 @@ void StorageReplicatedMergeTree::waitForReplicaToProcessLogEntry(const String &
/// Дождемся, пока запись попадет в очередь реплики.
while (true)
{
zkutil::EventPtr event = new Poco::Event;
zkutil::EventPtr event = std::make_shared<Poco::Event>();
String log_pointer = zookeeper->get(zookeeper_path + "/replicas/" + replica + "/log_pointer", nullptr, event);
if (!log_pointer.empty() && parse<UInt64>(log_pointer) > log_index)

View File

@ -24,7 +24,7 @@ template <typename T, typename Ptr = std::shared_ptr<T>>
class MultiVersion
{
public:
/// Конкретная версия объекта для использования. SharedPtr определяет время жизни версии.
/// Конкретная версия объекта для использования. shared_ptr определяет время жизни версии.
using Version = Ptr;
/// Инициализация по-умолчанию (NULL-ом).
@ -47,10 +47,10 @@ public:
set(std::move(value));
}
/// Получить текущую версию для использования. Возвращает SharedPtr, который определяет время жизни версии.
/// Получить текущую версию для использования. Возвращает shared_ptr, который определяет время жизни версии.
const Version get() const
{
/// TODO: можно ли заменять SharedPtr lock-free? (Можно, если сделать свою реализацию с использованием cmpxchg16b.)
/// TODO: можно ли заменять shared_ptr lock-free? (Можно, если сделать свою реализацию с использованием cmpxchg16b.)
std::lock_guard<std::mutex> lock(mutex);
return current_version;
}

View File

@ -10,7 +10,6 @@
#include <Poco/NumberFormatter.h>
#include <Poco/Mutex.h>
#include <Poco/Exception.h>
#include <Poco/SharedPtr.h>
#include <common/logger_useful.h>
#include <mysqlxx/Connection.h>

View File

@ -64,7 +64,7 @@ namespace mysqlxx
class PoolWithFailover final
{
private:
using PoolPtr = Poco::SharedPtr<Pool>;
using PoolPtr = std::shared_ptr<Pool>;
struct Replica
{
@ -86,7 +86,7 @@ namespace mysqlxx
/// Количество попыток подключения.
size_t max_tries;
/// Mutex для доступа к списку реплик.
Poco::FastMutex mutex;
std::mutex mutex;
public:
using Entry = Pool::Entry;

View File

@ -16,7 +16,7 @@ PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & cfg
if (*it == "replica") /// На том же уровне могут быть другие параметры.
{
std::string replica_name = config_name + "." + *it;
Replica replica(new Pool(cfg, replica_name, default_connections, max_connections, config_name.c_str()),
Replica replica(std::make_shared<Pool>(cfg, replica_name, default_connections, max_connections, config_name.c_str()),
cfg.getInt(replica_name + ".priority", 0));
replicas_by_priority[replica.priority].push_back(replica);
}
@ -24,7 +24,7 @@ PoolWithFailover::PoolWithFailover(const Poco::Util::AbstractConfiguration & cfg
}
else
{
replicas_by_priority[0].push_back(Replica(new Pool(cfg, config_name, default_connections, max_connections), 0));
replicas_by_priority[0].push_back(Replica(std::make_shared<Pool>(cfg, config_name, default_connections, max_connections), 0));
}
}
@ -44,14 +44,14 @@ PoolWithFailover::PoolWithFailover(const PoolWithFailover & other)
Replicas replicas;
replicas.reserve(replica_with_priority.second.size());
for (const auto & replica : replica_with_priority.second)
replicas.emplace_back(new Pool{*replica.pool}, replica.priority);
replicas.emplace_back(std::make_shared<Pool>(*replica.pool), replica.priority);
replicas_by_priority.emplace(replica_with_priority.first, std::move(replicas));
}
}
PoolWithFailover::Entry PoolWithFailover::Get()
{
Poco::ScopedLock<Poco::FastMutex> locker(mutex);
std::lock_guard<std::mutex> locker(mutex);
Poco::Util::Application & app = Poco::Util::Application::instance();
/// Если к какой-то реплике не подключились, потому что исчерпан лимит соединений, можно подождать и подключиться к ней.

View File

@ -2,6 +2,7 @@
#include <zkutil/ZooKeeper.h>
#include <functional>
#include <memory>
#include <common/logger_useful.h>
@ -79,7 +80,7 @@ private:
std::thread thread;
volatile bool shutdown = false;
zkutil::EventPtr event = new Poco::Event();
zkutil::EventPtr event = std::make_shared<Poco::Event>();
State state = WAITING_LEADERSHIP;
@ -122,6 +123,6 @@ private:
}
};
using LeaderElectionPtr = Poco::SharedPtr<LeaderElection>;
using LeaderElectionPtr = std::shared_ptr<LeaderElection>;
}

View File

@ -70,7 +70,7 @@ private:
private:
GetZooKeeper get_zookeeper;
EventPtr event = new Poco::Event;
EventPtr event = std::make_shared<Poco::Event>();
CancellationHook cancellation_hook;
/// Path to the lock request queue.
std::string path;

View File

@ -33,7 +33,7 @@ private:
private:
GetZooKeeper get_zookeeper;
EventPtr event = new Poco::Event;
EventPtr event = std::make_shared<Poco::Event>();
CancellationHook cancellation_hook;
std::string path;
std::string token;

View File

@ -2,10 +2,10 @@
#include <common/Common.h>
#include <boost/function.hpp>
#include <future>
#include <memory>
#include <boost/noncopyable.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <zookeeper/zookeeper.h>
#include <Poco/SharedPtr.h>
#include <Poco/Event.h>
namespace zkutil
@ -97,6 +97,6 @@ namespace CreateMode
extern const int PersistentSequential;
}
using EventPtr = Poco::SharedPtr<Poco::Event>;
using EventPtr = std::shared_ptr<Poco::Event>;
}

View File

@ -586,7 +586,7 @@ void ZooKeeper::waitForDisappear(const std::string & path)
{
while (true)
{
zkutil::EventPtr event = new Poco::Event;
zkutil::EventPtr event = std::make_shared<Poco::Event>();
std::string unused;
/// get вместо exists, чтобы не утек watch, если ноды уже нет.

View File

@ -94,7 +94,7 @@ int main(int argc, char ** argv)
std::string w;
ss >> w;
bool watch = w == "w";
zkutil::EventPtr event = watch ? new Poco::Event() : nullptr;
zkutil::EventPtr event = watch ? std::make_shared<Poco::Event>() : nullptr;
std::vector<std::string> v = zk.getChildren(path, nullptr, event);
for (size_t i = 0; i < v.size(); ++i)
{
@ -148,7 +148,7 @@ int main(int argc, char ** argv)
std::string w;
ss >> w;
bool watch = w == "w";
zkutil::EventPtr event = watch ? new Poco::Event() : nullptr;
zkutil::EventPtr event = watch ? std::make_shared<Poco::Event>() : nullptr;
zkutil::Stat stat;
bool e = zk.exists(path, &stat, event);
if (e)
@ -163,7 +163,7 @@ int main(int argc, char ** argv)
std::string w;
ss >> w;
bool watch = w == "w";
zkutil::EventPtr event = watch ? new Poco::Event() : nullptr;
zkutil::EventPtr event = watch ? std::make_shared<Poco::Event>() : nullptr;
zkutil::Stat stat;
std::string data = zk.get(path, &stat, event);
std::cout << "Data: " << data << std::endl;

View File

@ -17,7 +17,7 @@ int main()
std::cout << "create path" << std::endl;
zk.create("/test", "old", zkutil::CreateMode::Persistent);
zkutil::Stat stat;
zkutil::EventPtr watch = new Poco::Event;
zkutil::EventPtr watch = std::make_shared<Poco::Event>();
std::cout << "get path" << std::endl;
zk.get("/test", &stat, watch);

View File

@ -10,7 +10,6 @@
#include <Poco/NumberParser.h>
#include <Poco/NumberFormatter.h>
#include <Poco/Exception.h>
#include <Poco/SharedPtr.h>
#include <DB/Common/Exception.h>

View File

@ -10,7 +10,6 @@
#include <Poco/NumberParser.h>
#include <Poco/NumberFormatter.h>
#include <Poco/Exception.h>
#include <Poco/SharedPtr.h>
#include <DB/Common/Exception.h>

View File

@ -13,7 +13,6 @@
#include <Poco/NumberParser.h>
#include <Poco/NumberFormatter.h>
#include <Poco/Exception.h>
#include <Poco/SharedPtr.h>
#include <DB/Common/Exception.h>