Changed boost:: to std:: for bind, ref/cref, thread, shared_ptr [#METR-2807].

This commit is contained in:
Alexey Milovidov 2014-10-16 05:21:03 +04:00
parent afc4f7fcf0
commit c2e4662ad7
21 changed files with 118 additions and 122 deletions

View File

@ -113,7 +113,7 @@ protected:
void next() void next()
{ {
ready.reset(); ready.reset();
pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this, current_memory_tracker)); pool.schedule(std::bind(&AsynchronousBlockInputStream::calculate, this, current_memory_tracker));
} }

View File

@ -37,7 +37,7 @@ public:
: has_been_read(false), final(final_), max_threads(max_threads_), pool(std::min(max_threads, inputs_.size())) : has_been_read(false), final(final_), max_threads(max_threads_), pool(std::min(max_threads, inputs_.size()))
{ {
children.insert(children.end(), inputs_.begin(), inputs_.end()); children.insert(children.end(), inputs_.begin(), inputs_.end());
aggregator = new Aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_); aggregator = new Aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_);
} }
@ -76,8 +76,8 @@ protected:
for (size_t i = 0, size = many_data.size(); i < size; ++i) for (size_t i = 0, size = many_data.size(); i < size; ++i)
{ {
many_data[i] = new AggregatedDataVariants; many_data[i] = new AggregatedDataVariants;
pool.schedule(boost::bind(&ParallelAggregatingBlockInputStream::calculate, this, pool.schedule(std::bind(&ParallelAggregatingBlockInputStream::calculate, this,
boost::ref(children[i]), boost::ref(*many_data[i]), boost::ref(exceptions[i]), current_memory_tracker)); std::ref(children[i]), std::ref(*many_data[i]), std::ref(exceptions[i]), current_memory_tracker));
} }
pool.wait(); pool.wait();

View File

@ -16,7 +16,7 @@ namespace DB
{ {
using Poco::SharedPtr; using Poco::SharedPtr;
/** Записывает данные асинхронно с помощью двойной буферизации. /** Записывает данные асинхронно с помощью двойной буферизации.
*/ */
@ -51,7 +51,7 @@ private:
swapBuffers(); swapBuffers();
/// Данные будут записываться в отельном потоке. /// Данные будут записываться в отельном потоке.
pool.schedule(boost::bind(&AsynchronousWriteBuffer::thread, this)); pool.schedule([this] { thread(); });
} }
public: public:

View File

@ -1,6 +1,6 @@
#pragma once #pragma once
#include <boost/thread.hpp> #include <thread>
#include <Poco/SharedPtr.h> #include <Poco/SharedPtr.h>
@ -30,7 +30,7 @@ private:
/// Периодичность обновления справочников, в секундах. /// Периодичность обновления справочников, в секундах.
int reload_period; int reload_period;
boost::thread reloading_thread; std::thread reloading_thread;
Poco::Event destroy; Poco::Event destroy;
Logger * log; Logger * log;
@ -139,7 +139,7 @@ public:
log(&Logger::get("Dictionaries")) log(&Logger::get("Dictionaries"))
{ {
reloadImpl(); reloadImpl();
reloading_thread = boost::thread(&Dictionaries::reloadPeriodically, this); reloading_thread = std::thread([this] { reloadPeriodically(); });
} }
~Dictionaries() ~Dictionaries()
@ -157,12 +157,12 @@ public:
{ {
return tech_data_hierarchy.get(); return tech_data_hierarchy.get();
} }
MultiVersion<CategoriesHierarchy>::Version getCategoriesHierarchy() const MultiVersion<CategoriesHierarchy>::Version getCategoriesHierarchy() const
{ {
return categories_hierarchy.get(); return categories_hierarchy.get();
} }
MultiVersion<RegionsNames>::Version getRegionsNames() const MultiVersion<RegionsNames>::Version getRegionsNames() const
{ {
return regions_names.get(); return regions_names.get();

View File

@ -8,7 +8,7 @@
namespace DB namespace DB
{ {
/** То и дело объединяет таблицы, подходящие под регэксп, в таблицы типа Chunks. /** То и дело объединяет таблицы, подходящие под регэксп, в таблицы типа Chunks.
* После объндинения заменяет исходные таблицы таблицами типа ChunkRef. * После объндинения заменяет исходные таблицы таблицами типа ChunkRef.
* При чтении ведет себя как таблица типа Merge. * При чтении ведет себя как таблица типа Merge.
@ -26,10 +26,10 @@ public:
const std::string & destination_name_prefix_, /// Префикс имен создаваемых таблиц типа Chunks. const std::string & destination_name_prefix_, /// Префикс имен создаваемых таблиц типа Chunks.
size_t chunks_to_merge_, /// Сколько чанков сливать в одну группу. size_t chunks_to_merge_, /// Сколько чанков сливать в одну группу.
Context & context_); /// Известные таблицы. Context & context_); /// Известные таблицы.
std::string getName() const override { return "ChunkMerger"; } std::string getName() const override { return "ChunkMerger"; }
std::string getTableName() const override { return name; } std::string getTableName() const override { return name; }
const NamesAndTypesList & getColumnsList() const override { return *columns; } const NamesAndTypesList & getColumnsList() const override { return *columns; }
NameAndTypePair getColumn(const String & column_name) const override; NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override; bool hasColumn(const String & column_name) const override;
@ -45,7 +45,7 @@ public:
void shutdown() override; void shutdown() override;
~StorageChunkMerger() override; ~StorageChunkMerger() override;
private: private:
String this_database; String this_database;
String name; String name;
@ -56,13 +56,13 @@ private:
size_t chunks_to_merge; size_t chunks_to_merge;
Context & context; Context & context;
Settings settings; Settings settings;
boost::thread merge_thread; std::thread merge_thread;
Poco::Event cancel_merge_thread; Poco::Event cancel_merge_thread;
Logger * log; Logger * log;
volatile bool shutdown_called; volatile bool shutdown_called;
/// Название виртуального столбца, отвечающего за имя таблицы, из которой идет чтение. (Например "_table") /// Название виртуального столбца, отвечающего за имя таблицы, из которой идет чтение. (Например "_table")
String _table_column_name; String _table_column_name;
@ -88,5 +88,5 @@ private:
/// Нужно смотреть, залочив mutex из контекста. /// Нужно смотреть, залочив mutex из контекста.
static TableNames currently_written_groups; static TableNames currently_written_groups;
}; };
} }

View File

@ -1,15 +1,15 @@
#include <iostream> #include <iostream>
#include <stdlib.h> #include <stdlib.h>
#include <vector> #include <vector>
#include <boost/shared_ptr.hpp> #include <memory>
struct C struct C
{ {
volatile int data; volatile int data;
C(int n = 0) : data(n) {} C(int n = 0) : data(n) {}
C(const C & x) C(const C & x)
{ {
*this = x; *this = x;
@ -37,7 +37,7 @@ C f2()
{ {
C x; C x;
++x.data; ++x.data;
return x; return x;
} }
@ -45,7 +45,7 @@ C f3()
{ {
if (rand() % 10 == 0) if (rand() % 10 == 0)
return C(123); return C(123);
C x; C x;
++x.data; ++x.data;
@ -154,7 +154,7 @@ C f12()
C f13() C f13()
{ {
C x; C x;
if (rand() % 2) if (rand() % 2)
x = f1(); x = f1();
else else
@ -218,7 +218,7 @@ struct IFactory
virtual ~IFactory() {} virtual ~IFactory() {}
}; };
typedef boost::shared_ptr<IFactory> FactoryPtr; typedef std::unique_ptr<IFactory> FactoryPtr;
struct Factory1 : IFactory struct Factory1 : IFactory
@ -241,7 +241,7 @@ struct Factory3 : IFactory
factory = FactoryPtr(new Factory1); factory = FactoryPtr(new Factory1);
else else
factory = FactoryPtr(new Factory2); factory = FactoryPtr(new Factory2);
return factory->get(); return factory->get();
} }
}; };
@ -250,7 +250,7 @@ struct Factory3 : IFactory
int main(int argc, char ** argv) int main(int argc, char ** argv)
{ {
srand(time(0)); srand(time(0));
std::cerr << "f1: " << f1().data << std::endl; std::cerr << "f1: " << f1().data << std::endl;
std::cerr << "f2: " << f2().data << std::endl; std::cerr << "f2: " << f2().data << std::endl;
std::cerr << "f3: " << f3().data << std::endl; std::cerr << "f3: " << f3().data << std::endl;

View File

@ -109,8 +109,8 @@ int main(int argc, char ** argv)
BlockOutputStreamPtr out1 = format_factory.getOutput("TabSeparated", ob1, out_sample); BlockOutputStreamPtr out1 = format_factory.getOutput("TabSeparated", ob1, out_sample);
BlockOutputStreamPtr out2 = format_factory.getOutput("TabSeparated", ob2, out_sample); BlockOutputStreamPtr out2 = format_factory.getOutput("TabSeparated", ob2, out_sample);
boost::thread thr1(thread1, in1, out1, boost::ref(ob1)); std::thread thr1(std::bind(thread1, in1, out1, std::ref(ob1)));
boost::thread thr2(thread2, in2, out2, boost::ref(ob2)); std::thread thr2(std::bind(thread2, in2, out2, std::ref(ob2)));
fork.run(); fork.run();

View File

@ -84,11 +84,11 @@ int main(int argc, char ** argv)
boost::threadpool::pool pool(inputs.size() + forks.size()); boost::threadpool::pool pool(inputs.size() + forks.size());
pool.schedule(boost::bind(inputThread, inputs[0], out1, boost::ref(wb), boost::ref(mutex))); pool.schedule(std::bind(inputThread, inputs[0], out1, std::ref(wb), std::ref(mutex)));
pool.schedule(boost::bind(inputThread, inputs[1], out2, boost::ref(wb), boost::ref(mutex))); pool.schedule(std::bind(inputThread, inputs[1], out2, std::ref(wb), std::ref(mutex)));
for (size_t i = 0; i < forks.size(); ++i) for (size_t i = 0; i < forks.size(); ++i)
pool.schedule(boost::bind(forkThread, forks[i])); pool.schedule(std::bind(forkThread, forks[i]));
pool.wait(); pool.wait();
} }

View File

@ -1,5 +1,3 @@
#include <boost/bind.hpp>
#include <DB/Columns/ColumnArray.h> #include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnConst.h> #include <DB/Columns/ColumnConst.h>

View File

@ -1,5 +1,3 @@
#include <boost/bind.hpp>
#include <DB/Columns/ColumnArray.h> #include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnConst.h> #include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnNested.h> #include <DB/Columns/ColumnNested.h>

View File

@ -3,7 +3,6 @@
#include <Poco/Util/AbstractConfiguration.h> #include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h> #include <Poco/Util/Application.h>
#include <Poco/Net/NetworkInterface.h> #include <Poco/Net/NetworkInterface.h>
#include <boost/bind.hpp>
namespace DB namespace DB
{ {

View File

@ -17,8 +17,6 @@
#include <Poco/FileStream.h> #include <Poco/FileStream.h>
#include <algorithm> #include <algorithm>
#include <boost/bind.hpp>
#include <boost/bind/placeholders.hpp>
using namespace DB; using namespace DB;

View File

@ -87,11 +87,11 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
/// Параллельно вычисляем хэши и ключи. /// Параллельно вычисляем хэши и ключи.
for (size_t thread_no = 0; thread_no < threads; ++thread_no) for (size_t thread_no = 0; thread_no < threads; ++thread_no)
pool.schedule(boost::bind(&SplittingAggregator::calculateHashesThread, this, pool.schedule(std::bind(&SplittingAggregator::calculateHashesThread, this,
boost::ref(block), std::ref(block),
rows * thread_no / threads, rows * thread_no / threads,
rows * (thread_no + 1) / threads, rows * (thread_no + 1) / threads,
boost::ref(exceptions[thread_no]), std::ref(exceptions[thread_no]),
current_memory_tracker)); current_memory_tracker));
pool.wait(); pool.wait();
@ -101,11 +101,11 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData
/// Параллельно агрегируем в независимые хэш-таблицы /// Параллельно агрегируем в независимые хэш-таблицы
for (size_t thread_no = 0; thread_no < threads; ++thread_no) for (size_t thread_no = 0; thread_no < threads; ++thread_no)
pool.schedule(boost::bind(&SplittingAggregator::aggregateThread, this, pool.schedule(std::bind(&SplittingAggregator::aggregateThread, this,
boost::ref(block), std::ref(block),
boost::ref(*results[thread_no]), std::ref(*results[thread_no]),
thread_no, thread_no,
boost::ref(exceptions[thread_no]), std::ref(exceptions[thread_no]),
current_memory_tracker)); current_memory_tracker));
pool.wait(); pool.wait();
@ -131,11 +131,11 @@ void SplittingAggregator::convertToBlocks(ManyAggregatedDataVariants & data_vari
/// Параллельно конвертируем в блоки. /// Параллельно конвертируем в блоки.
for (size_t thread_no = 0; thread_no < threads; ++thread_no) for (size_t thread_no = 0; thread_no < threads; ++thread_no)
pool.schedule(boost::bind(&SplittingAggregator::convertToBlockThread, this, pool.schedule(std::bind(&SplittingAggregator::convertToBlockThread, this,
boost::ref(*data_variants[thread_no]), std::ref(*data_variants[thread_no]),
boost::ref(blocks[thread_no]), std::ref(blocks[thread_no]),
final, final,
boost::ref(exceptions[thread_no]), std::ref(exceptions[thread_no]),
current_memory_tracker)); current_memory_tracker));
pool.wait(); pool.wait();

View File

@ -6,7 +6,6 @@
#include <DB/Parsers/ASTIdentifier.h> #include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h> #include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Interpreters/Context.h> #include <DB/Interpreters/Context.h>
#include <boost/bind.hpp>
namespace DB namespace DB
{ {

View File

@ -1,6 +1,7 @@
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <Yandex/time2str.h> #include <Yandex/time2str.h>
#include <Poco/Ext/ScopedTry.h> #include <Poco/Ext/ScopedTry.h>
#include <DB/Storages/MergeTree/MergeTreeData.h>
#include <DB/Interpreters/ExpressionAnalyzer.h> #include <DB/Interpreters/ExpressionAnalyzer.h>
#include <DB/Storages/MergeTree/MergeTreeReader.h> #include <DB/Storages/MergeTree/MergeTreeReader.h>
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h> #include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
@ -12,7 +13,9 @@
#include <DB/DataStreams/copyData.h> #include <DB/DataStreams/copyData.h>
#include <DB/IO/WriteBufferFromFile.h> #include <DB/IO/WriteBufferFromFile.h>
#include <DB/DataTypes/DataTypeDate.h> #include <DB/DataTypes/DataTypeDate.h>
#include <algorithm> #include <algorithm>
#include <iomanip>

View File

@ -232,7 +232,7 @@ StorageChunkMerger::StorageChunkMerger(
context(context_), settings(context.getSettings()), context(context_), settings(context.getSettings()),
log(&Logger::get("StorageChunkMerger")), shutdown_called(false) log(&Logger::get("StorageChunkMerger")), shutdown_called(false)
{ {
merge_thread = boost::thread(&StorageChunkMerger::mergeThread, this); merge_thread = std::thread([this] { mergeThread(); });
_table_column_name = "_table" + VirtualColumnUtils::chooseSuffix(getColumnsList(), "_table"); _table_column_name = "_table" + VirtualColumnUtils::chooseSuffix(getColumnsList(), "_table");
} }

View File

@ -1,7 +1,7 @@
#include <string.h> #include <string.h>
#include <iostream> #include <iostream>
#include <statdaemons/threadpool.hpp> #include <statdaemons/threadpool.hpp>
#include <boost/bind.hpp> #include <functional>
#include <Yandex/MultiVersion.h> #include <Yandex/MultiVersion.h>
@ -28,7 +28,7 @@ int main(int argc, char ** argv)
{ {
const char * s1 = "Hello!"; const char * s1 = "Hello!";
const char * s2 = "Goodbye!"; const char * s2 = "Goodbye!";
size_t n = 1000; size_t n = 1000;
MV x(new T(s1)); MV x(new T(s1));
Results results(n); Results results(n);
@ -36,8 +36,8 @@ int main(int argc, char ** argv)
boost::threadpool::pool tp(8); boost::threadpool::pool tp(8);
for (size_t i = 0; i < n; ++i) for (size_t i = 0; i < n; ++i)
{ {
tp.schedule(boost::bind(thread1, boost::ref(x), boost::ref(results[i]))); tp.schedule(std::bind(thread1, std::ref(x), std::ref(results[i])));
tp.schedule(boost::bind(thread2, boost::ref(x), (rand() % 2) ? s1 : s2)); tp.schedule(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2));
} }
tp.wait(); tp.wait();
@ -50,6 +50,6 @@ int main(int argc, char ** argv)
std::cerr << e.message() << std::endl; std::cerr << e.message() << std::endl;
throw; throw;
} }
return 0; return 0;
} }

View File

@ -175,7 +175,7 @@ private:
static void processEvent(zhandle_t * zh, int type, int state, const char * path, void *watcherCtx); static void processEvent(zhandle_t * zh, int type, int state, const char * path, void *watcherCtx);
template <class T> template <class T>
int32_t retry(const T & operation, size_t * attempt = nullptr) int32_t retry(T && operation, size_t * attempt = nullptr)
{ {
int32_t code = operation(); int32_t code = operation();
if (attempt) if (attempt)
@ -187,7 +187,7 @@ private:
/// если потеряно соединение подождем timeout/3, авось восстановится /// если потеряно соединение подождем timeout/3, авось восстановится
if (code == ZCONNECTIONLOSS) if (code == ZCONNECTIONLOSS)
usleep(sessionTimeoutMs*1000/3); usleep(sessionTimeoutMs * 1000 / 3);
LOG_WARNING(log, "Error on attempt " << i << ": " << error2string(code) << ". Retry"); LOG_WARNING(log, "Error on attempt " << i << ": " << error2string(code) << ". Retry");
code = operation(); code = operation();

View File

@ -1,8 +1,9 @@
#include <functional>
#include <zkutil/ZooKeeper.h> #include <zkutil/ZooKeeper.h>
#include <boost/make_shared.hpp> #include <boost/make_shared.hpp>
#include <Yandex/logger_useful.h> #include <Yandex/logger_useful.h>
#include <DB/Common/ProfileEvents.h> #include <DB/Common/ProfileEvents.h>
#include <boost/bind.hpp>
namespace zkutil namespace zkutil
{ {
@ -176,7 +177,7 @@ Strings ZooKeeper::getChildren(
int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res, int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res,
Stat * stat_, EventPtr watch) Stat * stat_, EventPtr watch)
{ {
int32_t code = retry(boost::bind(&ZooKeeper::getChildrenImpl, this, boost::ref(path), boost::ref(res), stat_, watch)); int32_t code = retry(std::bind(&ZooKeeper::getChildrenImpl, this, std::ref(path), std::ref(res), stat_, watch));
if (!( code == ZOK || if (!( code == ZOK ||
code == ZNONODE)) code == ZNONODE))
@ -232,16 +233,16 @@ int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data,
return tryCreate(path, data, mode, pathCreated); return tryCreate(path, data, mode, pathCreated);
} }
int32_t ZooKeeper::tryCreateWithRetries(const std::string& path, const std::string& data, int32_t mode, std::string& pathCreated, size_t* attempt) int32_t ZooKeeper::tryCreateWithRetries(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated, size_t* attempt)
{ {
return retry(boost::bind(&ZooKeeper::tryCreate, this, boost::ref(path), boost::ref(data), mode, boost::ref(pathCreated)), attempt); return retry([&path, &data, mode, &pathCreated, this] { return tryCreate(path, data, mode, pathCreated); });
} }
void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data) void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data)
{ {
std::string pathCreated; std::string pathCreated;
int32_t code = retry(boost::bind(&ZooKeeper::createImpl, this, boost::ref(path), boost::ref(data), zkutil::CreateMode::Persistent, boost::ref(pathCreated))); int32_t code = retry(std::bind(&ZooKeeper::createImpl, this, std::ref(path), std::ref(data), zkutil::CreateMode::Persistent, std::ref(pathCreated)));
if (code == ZOK || code == ZNODEEXISTS) if (code == ZOK || code == ZNODEEXISTS)
return; return;
@ -288,7 +289,7 @@ int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version)
int32_t ZooKeeper::tryRemoveWithRetries(const std::string & path, int32_t version, size_t * attempt) int32_t ZooKeeper::tryRemoveWithRetries(const std::string & path, int32_t version, size_t * attempt)
{ {
int32_t code = retry(boost::bind(&ZooKeeper::removeImpl, this, boost::ref(path), version), attempt); int32_t code = retry(std::bind(&ZooKeeper::removeImpl, this, std::ref(path), version), attempt);
if (!( code == ZOK || if (!( code == ZOK ||
code == ZNONODE || code == ZNONODE ||
code == ZBADVERSION || code == ZBADVERSION ||
@ -316,7 +317,7 @@ int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat_, EventPtr w
bool ZooKeeper::exists(const std::string & path, Stat * stat_, EventPtr watch) bool ZooKeeper::exists(const std::string & path, Stat * stat_, EventPtr watch)
{ {
int32_t code = retry(boost::bind(&ZooKeeper::existsImpl, this, path, stat_, watch)); int32_t code = retry(std::bind(&ZooKeeper::existsImpl, this, path, stat_, watch));
if (!( code == ZOK || if (!( code == ZOK ||
code == ZNONODE)) code == ZNONODE))
@ -360,7 +361,7 @@ std::string ZooKeeper::get(const std::string & path, Stat * stat, EventPtr watch
bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_, EventPtr watch) bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_, EventPtr watch)
{ {
int32_t code = retry(boost::bind(&ZooKeeper::getImpl, this, boost::ref(path), boost::ref(res), stat_, watch)); int32_t code = retry(std::bind(&ZooKeeper::getImpl, this, std::ref(path), std::ref(res), stat_, watch));
if (!( code == ZOK || if (!( code == ZOK ||
code == ZNONODE)) code == ZNONODE))
@ -455,7 +456,7 @@ int32_t ZooKeeper::tryMulti(const Ops & ops_, OpResultsPtr * out_results_)
int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results, size_t * attempt) int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results, size_t * attempt)
{ {
int32_t code = retry(boost::bind(&ZooKeeper::multiImpl, this, boost::ref(ops), out_results), attempt); int32_t code = retry(std::bind(&ZooKeeper::multiImpl, this, std::ref(ops), out_results), attempt);
if (!(code == ZOK || if (!(code == ZOK ||
code == ZNONODE || code == ZNONODE ||
code == ZNODEEXISTS || code == ZNODEEXISTS ||

View File

@ -42,7 +42,7 @@ struct AlignedBuffer
{ {
int size; int size;
char * data; char * data;
AlignedBuffer(int size_) AlignedBuffer(int size_)
{ {
size_t page = sysconf(_SC_PAGESIZE); size_t page = sysconf(_SC_PAGESIZE);
@ -51,7 +51,7 @@ struct AlignedBuffer
if (!data) if (!data)
throwFromErrno("memalign failed"); throwFromErrno("memalign failed");
} }
~AlignedBuffer() ~AlignedBuffer()
{ {
free(data); free(data);
@ -63,15 +63,15 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
try try
{ {
AlignedBuffer direct_buf(block_size); AlignedBuffer direct_buf(block_size);
std::vector<char> simple_buf(block_size); std::vector<char> simple_buf(block_size);
char * buf; char * buf;
if ((mode & MODE_DIRECT)) if ((mode & MODE_DIRECT))
buf = direct_buf.data; buf = direct_buf.data;
else else
buf = &simple_buf[0]; buf = &simple_buf[0];
drand48_data rand_data; drand48_data rand_data;
timespec times; timespec times;
@ -86,7 +86,7 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
lrand48_r(&rand_data, &rand_result1); lrand48_r(&rand_data, &rand_result1);
lrand48_r(&rand_data, &rand_result2); lrand48_r(&rand_data, &rand_result2);
lrand48_r(&rand_data, &rand_result3); lrand48_r(&rand_data, &rand_result3);
for (size_t j = 0; j + 3 < block_size; j += 3) for (size_t j = 0; j + 3 < block_size; j += 3)
{ {
long r; long r;
@ -149,7 +149,7 @@ int mainImpl(int argc, char ** argv)
block_size = Poco::NumberParser::parseUnsigned64(argv[5]); block_size = Poco::NumberParser::parseUnsigned64(argv[5]);
threads = Poco::NumberParser::parseUnsigned(argv[6]); threads = Poco::NumberParser::parseUnsigned(argv[6]);
count = Poco::NumberParser::parseUnsigned(argv[7]); count = Poco::NumberParser::parseUnsigned(argv[7]);
for (int i = 0; argv[2][i]; ++i) for (int i = 0; argv[2][i]; ++i)
{ {
char c = argv[2][i]; char c = argv[2][i];
@ -174,7 +174,7 @@ int mainImpl(int argc, char ** argv)
throw Poco::Exception("Invalid mode"); throw Poco::Exception("Invalid mode");
} }
} }
boost::threadpool::pool pool(threads); boost::threadpool::pool pool(threads);
int fd = open(file_name, ((mode & MODE_READ) ? O_RDONLY : O_WRONLY) | ((mode & MODE_DIRECT) ? O_DIRECT : 0) | ((mode & MODE_SYNC) ? O_SYNC : 0)); int fd = open(file_name, ((mode & MODE_READ) ? O_RDONLY : O_WRONLY) | ((mode & MODE_DIRECT) ? O_DIRECT : 0) | ((mode & MODE_SYNC) ? O_SYNC : 0));
@ -185,11 +185,11 @@ int mainImpl(int argc, char ** argv)
Exceptions exceptions(threads); Exceptions exceptions(threads);
Stopwatch watch; Stopwatch watch;
for (size_t i = 0; i < threads; ++i) for (size_t i = 0; i < threads; ++i)
pool.schedule(boost::bind(thread, fd, mode, min_offset, max_offset, block_size, count, boost::ref(exceptions[i]))); pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, count, std::ref(exceptions[i])));
pool.wait(); pool.wait();
fsync(fd); fsync(fd);
for (size_t i = 0; i < threads; ++i) for (size_t i = 0; i < threads; ++i)
@ -213,7 +213,7 @@ int mainImpl(int argc, char ** argv)
<< ", " << count * threads / watch.elapsedSeconds() << " ops/sec." << ", " << count * threads / watch.elapsedSeconds() << " ops/sec."
<< ", " << count * threads * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec." << ", " << count * threads * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec."
<< std::endl; << std::endl;
return 0; return 0;
} }

View File

@ -35,12 +35,12 @@ inline int io_setup(unsigned nr, aio_context_t *ctxp)
return syscall(__NR_io_setup, nr, ctxp); return syscall(__NR_io_setup, nr, ctxp);
} }
inline int io_destroy(aio_context_t ctx) inline int io_destroy(aio_context_t ctx)
{ {
return syscall(__NR_io_destroy, ctx); return syscall(__NR_io_destroy, ctx);
} }
inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp) inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp)
{ {
return syscall(__NR_io_submit, ctx, nr, iocbpp); return syscall(__NR_io_submit, ctx, nr, iocbpp);
} }
@ -62,9 +62,9 @@ struct AlignedBuffer
{ {
int size = 0; int size = 0;
char * data = nullptr; char * data = nullptr;
AlignedBuffer() {} AlignedBuffer() {}
void init(int size_) void init(int size_)
{ {
uninit(); uninit();
@ -74,7 +74,7 @@ struct AlignedBuffer
if (!data) if (!data)
throwFromErrno("memalign failed"); throwFromErrno("memalign failed");
} }
void uninit() void uninit()
{ {
if (data) if (data)
@ -82,12 +82,12 @@ struct AlignedBuffer
data = nullptr; data = nullptr;
size = 0; size = 0;
} }
AlignedBuffer(int size_) : size(0), data(NULL) AlignedBuffer(int size_) : size(0), data(NULL)
{ {
init(size_); init(size_);
} }
~AlignedBuffer() ~AlignedBuffer()
{ {
uninit(); uninit();
@ -97,14 +97,14 @@ struct AlignedBuffer
struct AioContext struct AioContext
{ {
aio_context_t ctx; aio_context_t ctx;
AioContext() AioContext()
{ {
ctx = 0; ctx = 0;
if (io_setup(128, &ctx) < 0) if (io_setup(128, &ctx) < 0)
throwFromErrno("io_setup failed"); throwFromErrno("io_setup failed");
} }
~AioContext() ~AioContext()
{ {
io_destroy(ctx); io_destroy(ctx);
@ -119,25 +119,25 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
try try
{ {
AioContext ctx; AioContext ctx;
std::vector<AlignedBuffer> buffers(buffers_count); std::vector<AlignedBuffer> buffers(buffers_count);
for (size_t i = 0; i < buffers_count; ++i) for (size_t i = 0; i < buffers_count; ++i)
{ {
buffers[i].init(block_size); buffers[i].init(block_size);
} }
drand48_data rand_data; drand48_data rand_data;
timespec times; timespec times;
clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times); clock_gettime(CLOCK_THREAD_CPUTIME_ID, &times);
srand48_r(times.tv_nsec, &rand_data); srand48_r(times.tv_nsec, &rand_data);
size_t in_progress = 0; size_t in_progress = 0;
size_t blocks_sent = 0; size_t blocks_sent = 0;
std::vector<bool> buffer_used(buffers_count, false); std::vector<bool> buffer_used(buffers_count, false);
std::vector<iocb> iocbs(buffers_count); std::vector<iocb> iocbs(buffers_count);
std::vector<iocb*> query_cbs; std::vector<iocb*> query_cbs;
std::vector<io_event> events(buffers_count); std::vector<io_event> events(buffers_count);
while (blocks_sent < count || in_progress > 0) while (blocks_sent < count || in_progress > 0)
{ {
/// Составим запросы. /// Составим запросы.
@ -146,16 +146,16 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
{ {
if (blocks_sent >= count || in_progress >= buffers_count) if (blocks_sent >= count || in_progress >= buffers_count)
break; break;
if (buffer_used[i]) if (buffer_used[i])
continue; continue;
buffer_used[i] = true; buffer_used[i] = true;
++blocks_sent; ++blocks_sent;
++in_progress; ++in_progress;
char * buf = buffers[i].data; char * buf = buffers[i].data;
for (size_t j = 0; j + 3 < block_size; j += 3) for (size_t j = 0; j + 3 < block_size; j += 3)
{ {
long r; long r;
@ -164,17 +164,17 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
buf[j + 1] = static_cast<char>((r >> 8) & 255); buf[j + 1] = static_cast<char>((r >> 8) & 255);
buf[j + 2] = static_cast<char>((r >> 16) & 255); buf[j + 2] = static_cast<char>((r >> 16) & 255);
} }
long rand_result1 = 0; long rand_result1 = 0;
long rand_result2 = 0; long rand_result2 = 0;
long rand_result3 = 0; long rand_result3 = 0;
lrand48_r(&rand_data, &rand_result1); lrand48_r(&rand_data, &rand_result1);
lrand48_r(&rand_data, &rand_result2); lrand48_r(&rand_data, &rand_result2);
lrand48_r(&rand_data, &rand_result3); lrand48_r(&rand_data, &rand_result3);
size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43); size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43);
size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size; size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size;
iocb & cb = iocbs[i]; iocb & cb = iocbs[i];
memset(&cb, 0, sizeof(cb)); memset(&cb, 0, sizeof(cb));
cb.aio_buf = reinterpret_cast<uint64_t>(buf); cb.aio_buf = reinterpret_cast<uint64_t>(buf);
@ -182,7 +182,7 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
cb.aio_nbytes = block_size; cb.aio_nbytes = block_size;
cb.aio_offset = offset; cb.aio_offset = offset;
cb.aio_data = static_cast<uint64_t>(i); cb.aio_data = static_cast<uint64_t>(i);
if (mode == MODE_READ) if (mode == MODE_READ)
{ {
cb.aio_lio_opcode = IOCB_CMD_PREAD; cb.aio_lio_opcode = IOCB_CMD_PREAD;
@ -191,20 +191,20 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
{ {
cb.aio_lio_opcode = IOCB_CMD_PWRITE; cb.aio_lio_opcode = IOCB_CMD_PWRITE;
} }
query_cbs.push_back(&cb); query_cbs.push_back(&cb);
} }
/// Отправим запросы. /// Отправим запросы.
if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0) if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0)
throwFromErrno("io_submit failed"); throwFromErrno("io_submit failed");
/// Получим ответы. Если еще есть что отправлять, получим хотя бы один ответ (после этого пойдем отправлять), иначе дождемся всех ответов. /// Получим ответы. Если еще есть что отправлять, получим хотя бы один ответ (после этого пойдем отправлять), иначе дождемся всех ответов.
memset(&events[0], 0, buffers_count * sizeof(events[0])); memset(&events[0], 0, buffers_count * sizeof(events[0]));
int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], nullptr); int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], nullptr);
if (evs < 0) if (evs < 0)
throwFromErrno("io_getevents failed"); throwFromErrno("io_getevents failed");
for (int i = 0; i < evs; ++i) for (int i = 0; i < evs; ++i)
{ {
int b = static_cast<int>(events[i].data); int b = static_cast<int>(events[i].data);
@ -214,7 +214,7 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block
buffer_used[b] = false; buffer_used[b] = false;
} }
} }
// iocb cb; // iocb cb;
// memset(&cb, 0, sizeof(cb)); // memset(&cb, 0, sizeof(cb));
// cb.aio_lio_opcode = IOCB_CMD_FSYNC; // cb.aio_lio_opcode = IOCB_CMD_FSYNC;
@ -249,13 +249,13 @@ int mainImpl(int argc, char ** argv)
size_t buffers_count = 0; size_t buffers_count = 0;
size_t threads_count = 0; size_t threads_count = 0;
size_t count = 0; size_t count = 0;
if (argc != 9) if (argc != 9)
{ {
std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size threads buffers count" << std::endl; std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size threads buffers count" << std::endl;
return 1; return 1;
} }
file_name = argv[1]; file_name = argv[1];
if (argv[2][0] == 'w') if (argv[2][0] == 'w')
mode = MODE_WRITE; mode = MODE_WRITE;
@ -265,38 +265,38 @@ int mainImpl(int argc, char ** argv)
threads_count = Poco::NumberParser::parseUnsigned(argv[6]); threads_count = Poco::NumberParser::parseUnsigned(argv[6]);
buffers_count = Poco::NumberParser::parseUnsigned(argv[7]); buffers_count = Poco::NumberParser::parseUnsigned(argv[7]);
count = Poco::NumberParser::parseUnsigned(argv[8]); count = Poco::NumberParser::parseUnsigned(argv[8]);
int fd = open(file_name, ((mode == MODE_READ) ? O_RDONLY : O_WRONLY) | O_DIRECT); int fd = open(file_name, ((mode == MODE_READ) ? O_RDONLY : O_WRONLY) | O_DIRECT);
if (-1 == fd) if (-1 == fd)
throwFromErrno("Cannot open file"); throwFromErrno("Cannot open file");
typedef std::vector<ExceptionPtr> Exceptions; typedef std::vector<ExceptionPtr> Exceptions;
boost::threadpool::pool pool(threads_count); boost::threadpool::pool pool(threads_count);
Exceptions exceptions(threads_count); Exceptions exceptions(threads_count);
Stopwatch watch; Stopwatch watch;
for (size_t i = 0; i < threads_count; ++i) for (size_t i = 0; i < threads_count; ++i)
pool.schedule(boost::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count, boost::ref(exceptions[i]))); pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count, std::ref(exceptions[i])));
pool.wait(); pool.wait();
watch.stop(); watch.stop();
for (size_t i = 0; i < threads_count; ++i) for (size_t i = 0; i < threads_count; ++i)
if (exceptions[i]) if (exceptions[i])
exceptions[i]->rethrow(); exceptions[i]->rethrow();
if (0 != close(fd)) if (0 != close(fd))
throwFromErrno("Cannot close file"); throwFromErrno("Cannot close file");
std::cout << std::fixed << std::setprecision(2) std::cout << std::fixed << std::setprecision(2)
<< "Done " << count << " * " << threads_count << " ops"; << "Done " << count << " * " << threads_count << " ops";
std::cout << " in " << watch.elapsedSeconds() << " sec." std::cout << " in " << watch.elapsedSeconds() << " sec."
<< ", " << count * threads_count / watch.elapsedSeconds() << " ops/sec." << ", " << count * threads_count / watch.elapsedSeconds() << " ops/sec."
<< ", " << count * threads_count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec." << ", " << count * threads_count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec."
<< std::endl; << std::endl;
return 0; return 0;
} }