From c2e4662ad7b70b8cc78bc68bd49d2a391ca711b6 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Thu, 16 Oct 2014 05:21:03 +0400 Subject: [PATCH] Changed boost:: to std:: for bind, ref/cref, thread, shared_ptr [#METR-2807]. --- .../AsynchronousBlockInputStream.h | 2 +- .../ParallelAggregatingBlockInputStream.h | 6 +- dbms/include/DB/IO/AsynchronousWriteBuffer.h | 4 +- dbms/include/DB/Interpreters/Dictionaries.h | 10 +-- dbms/include/DB/Storages/StorageChunkMerger.h | 18 ++--- dbms/src/Core/tests/rvo_test.cpp | 18 ++--- dbms/src/DataStreams/tests/fork_streams.cpp | 4 +- dbms/src/DataStreams/tests/glue_streams.cpp | 6 +- dbms/src/DataTypes/DataTypeArray.cpp | 2 - dbms/src/DataTypes/DataTypeNested.cpp | 2 - dbms/src/Interpreters/Cluster.cpp | 1 - .../Interpreters/InterpreterAlterQuery.cpp | 2 - dbms/src/Interpreters/SplittingAggregator.cpp | 22 +++--- dbms/src/Storages/ITableDeclaration.cpp | 1 - dbms/src/Storages/MergeTree/MergeTreeData.cpp | 5 +- dbms/src/Storages/StorageChunkMerger.cpp | 2 +- libs/libcommon/src/tests/multi_version.cpp | 10 +-- libs/libzkutil/include/zkutil/ZooKeeper.h | 4 +- libs/libzkutil/src/ZooKeeper.cpp | 19 ++--- utils/iotest/iotest.cpp | 24 +++--- utils/iotest/iotest_aio.cpp | 78 +++++++++---------- 21 files changed, 118 insertions(+), 122 deletions(-) diff --git a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h index f47a32eef13..36b9488775e 100644 --- a/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h +++ b/dbms/include/DB/DataStreams/AsynchronousBlockInputStream.h @@ -113,7 +113,7 @@ protected: void next() { ready.reset(); - pool.schedule(boost::bind(&AsynchronousBlockInputStream::calculate, this, current_memory_tracker)); + pool.schedule(std::bind(&AsynchronousBlockInputStream::calculate, this, current_memory_tracker)); } diff --git a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h index 239af6a3b16..e9eaee7b021 100644 --- a/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/include/DB/DataStreams/ParallelAggregatingBlockInputStream.h @@ -37,7 +37,7 @@ public: : has_been_read(false), final(final_), max_threads(max_threads_), pool(std::min(max_threads, inputs_.size())) { children.insert(children.end(), inputs_.begin(), inputs_.end()); - + aggregator = new Aggregator(key_names, aggregates, overflow_row_, max_rows_to_group_by_, group_by_overflow_mode_); } @@ -76,8 +76,8 @@ protected: for (size_t i = 0, size = many_data.size(); i < size; ++i) { many_data[i] = new AggregatedDataVariants; - pool.schedule(boost::bind(&ParallelAggregatingBlockInputStream::calculate, this, - boost::ref(children[i]), boost::ref(*many_data[i]), boost::ref(exceptions[i]), current_memory_tracker)); + pool.schedule(std::bind(&ParallelAggregatingBlockInputStream::calculate, this, + std::ref(children[i]), std::ref(*many_data[i]), std::ref(exceptions[i]), current_memory_tracker)); } pool.wait(); diff --git a/dbms/include/DB/IO/AsynchronousWriteBuffer.h b/dbms/include/DB/IO/AsynchronousWriteBuffer.h index 138fbd62c2e..7847fe9395e 100644 --- a/dbms/include/DB/IO/AsynchronousWriteBuffer.h +++ b/dbms/include/DB/IO/AsynchronousWriteBuffer.h @@ -16,7 +16,7 @@ namespace DB { using Poco::SharedPtr; - + /** Записывает данные асинхронно с помощью двойной буферизации. */ @@ -51,7 +51,7 @@ private: swapBuffers(); /// Данные будут записываться в отельном потоке. - pool.schedule(boost::bind(&AsynchronousWriteBuffer::thread, this)); + pool.schedule([this] { thread(); }); } public: diff --git a/dbms/include/DB/Interpreters/Dictionaries.h b/dbms/include/DB/Interpreters/Dictionaries.h index 33b677526bb..a202c8b1cbd 100644 --- a/dbms/include/DB/Interpreters/Dictionaries.h +++ b/dbms/include/DB/Interpreters/Dictionaries.h @@ -1,6 +1,6 @@ #pragma once -#include +#include #include @@ -30,7 +30,7 @@ private: /// Периодичность обновления справочников, в секундах. int reload_period; - boost::thread reloading_thread; + std::thread reloading_thread; Poco::Event destroy; Logger * log; @@ -139,7 +139,7 @@ public: log(&Logger::get("Dictionaries")) { reloadImpl(); - reloading_thread = boost::thread(&Dictionaries::reloadPeriodically, this); + reloading_thread = std::thread([this] { reloadPeriodically(); }); } ~Dictionaries() @@ -157,12 +157,12 @@ public: { return tech_data_hierarchy.get(); } - + MultiVersion::Version getCategoriesHierarchy() const { return categories_hierarchy.get(); } - + MultiVersion::Version getRegionsNames() const { return regions_names.get(); diff --git a/dbms/include/DB/Storages/StorageChunkMerger.h b/dbms/include/DB/Storages/StorageChunkMerger.h index 810a6c14b8f..826f95e2494 100644 --- a/dbms/include/DB/Storages/StorageChunkMerger.h +++ b/dbms/include/DB/Storages/StorageChunkMerger.h @@ -8,7 +8,7 @@ namespace DB { - + /** То и дело объединяет таблицы, подходящие под регэксп, в таблицы типа Chunks. * После объндинения заменяет исходные таблицы таблицами типа ChunkRef. * При чтении ведет себя как таблица типа Merge. @@ -26,10 +26,10 @@ public: const std::string & destination_name_prefix_, /// Префикс имен создаваемых таблиц типа Chunks. size_t chunks_to_merge_, /// Сколько чанков сливать в одну группу. Context & context_); /// Известные таблицы. - + std::string getName() const override { return "ChunkMerger"; } std::string getTableName() const override { return name; } - + const NamesAndTypesList & getColumnsList() const override { return *columns; } NameAndTypePair getColumn(const String & column_name) const override; bool hasColumn(const String & column_name) const override; @@ -45,7 +45,7 @@ public: void shutdown() override; ~StorageChunkMerger() override; - + private: String this_database; String name; @@ -56,13 +56,13 @@ private: size_t chunks_to_merge; Context & context; Settings settings; - - boost::thread merge_thread; + + std::thread merge_thread; Poco::Event cancel_merge_thread; - + Logger * log; volatile bool shutdown_called; - + /// Название виртуального столбца, отвечающего за имя таблицы, из которой идет чтение. (Например "_table") String _table_column_name; @@ -88,5 +88,5 @@ private: /// Нужно смотреть, залочив mutex из контекста. static TableNames currently_written_groups; }; - + } diff --git a/dbms/src/Core/tests/rvo_test.cpp b/dbms/src/Core/tests/rvo_test.cpp index 1d9b6f2af7f..6aaa8d7c0dd 100644 --- a/dbms/src/Core/tests/rvo_test.cpp +++ b/dbms/src/Core/tests/rvo_test.cpp @@ -1,15 +1,15 @@ #include #include #include -#include +#include struct C { volatile int data; - + C(int n = 0) : data(n) {} - + C(const C & x) { *this = x; @@ -37,7 +37,7 @@ C f2() { C x; ++x.data; - + return x; } @@ -45,7 +45,7 @@ C f3() { if (rand() % 10 == 0) return C(123); - + C x; ++x.data; @@ -154,7 +154,7 @@ C f12() C f13() { C x; - + if (rand() % 2) x = f1(); else @@ -218,7 +218,7 @@ struct IFactory virtual ~IFactory() {} }; -typedef boost::shared_ptr FactoryPtr; +typedef std::unique_ptr FactoryPtr; struct Factory1 : IFactory @@ -241,7 +241,7 @@ struct Factory3 : IFactory factory = FactoryPtr(new Factory1); else factory = FactoryPtr(new Factory2); - + return factory->get(); } }; @@ -250,7 +250,7 @@ struct Factory3 : IFactory int main(int argc, char ** argv) { srand(time(0)); - + std::cerr << "f1: " << f1().data << std::endl; std::cerr << "f2: " << f2().data << std::endl; std::cerr << "f3: " << f3().data << std::endl; diff --git a/dbms/src/DataStreams/tests/fork_streams.cpp b/dbms/src/DataStreams/tests/fork_streams.cpp index ed5f04471e4..01c8416e65e 100644 --- a/dbms/src/DataStreams/tests/fork_streams.cpp +++ b/dbms/src/DataStreams/tests/fork_streams.cpp @@ -109,8 +109,8 @@ int main(int argc, char ** argv) BlockOutputStreamPtr out1 = format_factory.getOutput("TabSeparated", ob1, out_sample); BlockOutputStreamPtr out2 = format_factory.getOutput("TabSeparated", ob2, out_sample); - boost::thread thr1(thread1, in1, out1, boost::ref(ob1)); - boost::thread thr2(thread2, in2, out2, boost::ref(ob2)); + std::thread thr1(std::bind(thread1, in1, out1, std::ref(ob1))); + std::thread thr2(std::bind(thread2, in2, out2, std::ref(ob2))); fork.run(); diff --git a/dbms/src/DataStreams/tests/glue_streams.cpp b/dbms/src/DataStreams/tests/glue_streams.cpp index 19e1ddec8a6..432a69d6943 100644 --- a/dbms/src/DataStreams/tests/glue_streams.cpp +++ b/dbms/src/DataStreams/tests/glue_streams.cpp @@ -84,11 +84,11 @@ int main(int argc, char ** argv) boost::threadpool::pool pool(inputs.size() + forks.size()); - pool.schedule(boost::bind(inputThread, inputs[0], out1, boost::ref(wb), boost::ref(mutex))); - pool.schedule(boost::bind(inputThread, inputs[1], out2, boost::ref(wb), boost::ref(mutex))); + pool.schedule(std::bind(inputThread, inputs[0], out1, std::ref(wb), std::ref(mutex))); + pool.schedule(std::bind(inputThread, inputs[1], out2, std::ref(wb), std::ref(mutex))); for (size_t i = 0; i < forks.size(); ++i) - pool.schedule(boost::bind(forkThread, forks[i])); + pool.schedule(std::bind(forkThread, forks[i])); pool.wait(); } diff --git a/dbms/src/DataTypes/DataTypeArray.cpp b/dbms/src/DataTypes/DataTypeArray.cpp index ecb0dff7636..5570e8cf069 100644 --- a/dbms/src/DataTypes/DataTypeArray.cpp +++ b/dbms/src/DataTypes/DataTypeArray.cpp @@ -1,5 +1,3 @@ -#include - #include #include diff --git a/dbms/src/DataTypes/DataTypeNested.cpp b/dbms/src/DataTypes/DataTypeNested.cpp index b2c07073e3d..7bbb5fb3f6e 100644 --- a/dbms/src/DataTypes/DataTypeNested.cpp +++ b/dbms/src/DataTypes/DataTypeNested.cpp @@ -1,5 +1,3 @@ -#include - #include #include #include diff --git a/dbms/src/Interpreters/Cluster.cpp b/dbms/src/Interpreters/Cluster.cpp index 7c7e04ae7f3..c02c6edb2cc 100644 --- a/dbms/src/Interpreters/Cluster.cpp +++ b/dbms/src/Interpreters/Cluster.cpp @@ -3,7 +3,6 @@ #include #include #include -#include namespace DB { diff --git a/dbms/src/Interpreters/InterpreterAlterQuery.cpp b/dbms/src/Interpreters/InterpreterAlterQuery.cpp index 731b05f9d2b..d559f33ecf5 100644 --- a/dbms/src/Interpreters/InterpreterAlterQuery.cpp +++ b/dbms/src/Interpreters/InterpreterAlterQuery.cpp @@ -17,8 +17,6 @@ #include #include -#include -#include using namespace DB; diff --git a/dbms/src/Interpreters/SplittingAggregator.cpp b/dbms/src/Interpreters/SplittingAggregator.cpp index b4603c13d31..2b97be9e5b7 100644 --- a/dbms/src/Interpreters/SplittingAggregator.cpp +++ b/dbms/src/Interpreters/SplittingAggregator.cpp @@ -87,11 +87,11 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData /// Параллельно вычисляем хэши и ключи. for (size_t thread_no = 0; thread_no < threads; ++thread_no) - pool.schedule(boost::bind(&SplittingAggregator::calculateHashesThread, this, - boost::ref(block), + pool.schedule(std::bind(&SplittingAggregator::calculateHashesThread, this, + std::ref(block), rows * thread_no / threads, rows * (thread_no + 1) / threads, - boost::ref(exceptions[thread_no]), + std::ref(exceptions[thread_no]), current_memory_tracker)); pool.wait(); @@ -101,11 +101,11 @@ void SplittingAggregator::execute(BlockInputStreamPtr stream, ManyAggregatedData /// Параллельно агрегируем в независимые хэш-таблицы for (size_t thread_no = 0; thread_no < threads; ++thread_no) - pool.schedule(boost::bind(&SplittingAggregator::aggregateThread, this, - boost::ref(block), - boost::ref(*results[thread_no]), + pool.schedule(std::bind(&SplittingAggregator::aggregateThread, this, + std::ref(block), + std::ref(*results[thread_no]), thread_no, - boost::ref(exceptions[thread_no]), + std::ref(exceptions[thread_no]), current_memory_tracker)); pool.wait(); @@ -131,11 +131,11 @@ void SplittingAggregator::convertToBlocks(ManyAggregatedDataVariants & data_vari /// Параллельно конвертируем в блоки. for (size_t thread_no = 0; thread_no < threads; ++thread_no) - pool.schedule(boost::bind(&SplittingAggregator::convertToBlockThread, this, - boost::ref(*data_variants[thread_no]), - boost::ref(blocks[thread_no]), + pool.schedule(std::bind(&SplittingAggregator::convertToBlockThread, this, + std::ref(*data_variants[thread_no]), + std::ref(blocks[thread_no]), final, - boost::ref(exceptions[thread_no]), + std::ref(exceptions[thread_no]), current_memory_tracker)); pool.wait(); diff --git a/dbms/src/Storages/ITableDeclaration.cpp b/dbms/src/Storages/ITableDeclaration.cpp index 0468ba13451..c70ce422220 100644 --- a/dbms/src/Storages/ITableDeclaration.cpp +++ b/dbms/src/Storages/ITableDeclaration.cpp @@ -6,7 +6,6 @@ #include #include #include -#include namespace DB { diff --git a/dbms/src/Storages/MergeTree/MergeTreeData.cpp b/dbms/src/Storages/MergeTree/MergeTreeData.cpp index 675f82a55cd..6f02a5f4e20 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeData.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeData.cpp @@ -1,6 +1,7 @@ -#include #include #include + +#include #include #include #include @@ -12,7 +13,9 @@ #include #include #include + #include +#include diff --git a/dbms/src/Storages/StorageChunkMerger.cpp b/dbms/src/Storages/StorageChunkMerger.cpp index f88751c5740..1cbde313bbb 100644 --- a/dbms/src/Storages/StorageChunkMerger.cpp +++ b/dbms/src/Storages/StorageChunkMerger.cpp @@ -232,7 +232,7 @@ StorageChunkMerger::StorageChunkMerger( context(context_), settings(context.getSettings()), log(&Logger::get("StorageChunkMerger")), shutdown_called(false) { - merge_thread = boost::thread(&StorageChunkMerger::mergeThread, this); + merge_thread = std::thread([this] { mergeThread(); }); _table_column_name = "_table" + VirtualColumnUtils::chooseSuffix(getColumnsList(), "_table"); } diff --git a/libs/libcommon/src/tests/multi_version.cpp b/libs/libcommon/src/tests/multi_version.cpp index 611ef15c32d..c26bb8198cc 100644 --- a/libs/libcommon/src/tests/multi_version.cpp +++ b/libs/libcommon/src/tests/multi_version.cpp @@ -1,7 +1,7 @@ #include #include #include -#include +#include #include @@ -28,7 +28,7 @@ int main(int argc, char ** argv) { const char * s1 = "Hello!"; const char * s2 = "Goodbye!"; - + size_t n = 1000; MV x(new T(s1)); Results results(n); @@ -36,8 +36,8 @@ int main(int argc, char ** argv) boost::threadpool::pool tp(8); for (size_t i = 0; i < n; ++i) { - tp.schedule(boost::bind(thread1, boost::ref(x), boost::ref(results[i]))); - tp.schedule(boost::bind(thread2, boost::ref(x), (rand() % 2) ? s1 : s2)); + tp.schedule(std::bind(thread1, std::ref(x), std::ref(results[i]))); + tp.schedule(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2)); } tp.wait(); @@ -50,6 +50,6 @@ int main(int argc, char ** argv) std::cerr << e.message() << std::endl; throw; } - + return 0; } diff --git a/libs/libzkutil/include/zkutil/ZooKeeper.h b/libs/libzkutil/include/zkutil/ZooKeeper.h index df39bc3bec6..640296cd380 100644 --- a/libs/libzkutil/include/zkutil/ZooKeeper.h +++ b/libs/libzkutil/include/zkutil/ZooKeeper.h @@ -175,7 +175,7 @@ private: static void processEvent(zhandle_t * zh, int type, int state, const char * path, void *watcherCtx); template - int32_t retry(const T & operation, size_t * attempt = nullptr) + int32_t retry(T && operation, size_t * attempt = nullptr) { int32_t code = operation(); if (attempt) @@ -187,7 +187,7 @@ private: /// если потеряно соединение подождем timeout/3, авось восстановится if (code == ZCONNECTIONLOSS) - usleep(sessionTimeoutMs*1000/3); + usleep(sessionTimeoutMs * 1000 / 3); LOG_WARNING(log, "Error on attempt " << i << ": " << error2string(code) << ". Retry"); code = operation(); diff --git a/libs/libzkutil/src/ZooKeeper.cpp b/libs/libzkutil/src/ZooKeeper.cpp index d1df303973f..7be5260640c 100644 --- a/libs/libzkutil/src/ZooKeeper.cpp +++ b/libs/libzkutil/src/ZooKeeper.cpp @@ -1,8 +1,9 @@ +#include #include #include #include #include -#include + namespace zkutil { @@ -176,7 +177,7 @@ Strings ZooKeeper::getChildren( int32_t ZooKeeper::tryGetChildren(const std::string & path, Strings & res, Stat * stat_, EventPtr watch) { - int32_t code = retry(boost::bind(&ZooKeeper::getChildrenImpl, this, boost::ref(path), boost::ref(res), stat_, watch)); + int32_t code = retry(std::bind(&ZooKeeper::getChildrenImpl, this, std::ref(path), std::ref(res), stat_, watch)); if (!( code == ZOK || code == ZNONODE)) @@ -232,16 +233,16 @@ int32_t ZooKeeper::tryCreate(const std::string & path, const std::string & data, return tryCreate(path, data, mode, pathCreated); } -int32_t ZooKeeper::tryCreateWithRetries(const std::string& path, const std::string& data, int32_t mode, std::string& pathCreated, size_t* attempt) +int32_t ZooKeeper::tryCreateWithRetries(const std::string & path, const std::string & data, int32_t mode, std::string & pathCreated, size_t* attempt) { - return retry(boost::bind(&ZooKeeper::tryCreate, this, boost::ref(path), boost::ref(data), mode, boost::ref(pathCreated)), attempt); + return retry([&path, &data, mode, &pathCreated, this] { return tryCreate(path, data, mode, pathCreated); }); } void ZooKeeper::createIfNotExists(const std::string & path, const std::string & data) { std::string pathCreated; - int32_t code = retry(boost::bind(&ZooKeeper::createImpl, this, boost::ref(path), boost::ref(data), zkutil::CreateMode::Persistent, boost::ref(pathCreated))); + int32_t code = retry(std::bind(&ZooKeeper::createImpl, this, std::ref(path), std::ref(data), zkutil::CreateMode::Persistent, std::ref(pathCreated))); if (code == ZOK || code == ZNODEEXISTS) return; @@ -288,7 +289,7 @@ int32_t ZooKeeper::tryRemove(const std::string & path, int32_t version) int32_t ZooKeeper::tryRemoveWithRetries(const std::string & path, int32_t version, size_t * attempt) { - int32_t code = retry(boost::bind(&ZooKeeper::removeImpl, this, boost::ref(path), version), attempt); + int32_t code = retry(std::bind(&ZooKeeper::removeImpl, this, std::ref(path), version), attempt); if (!( code == ZOK || code == ZNONODE || code == ZBADVERSION || @@ -316,7 +317,7 @@ int32_t ZooKeeper::existsImpl(const std::string & path, Stat * stat_, EventPtr w bool ZooKeeper::exists(const std::string & path, Stat * stat_, EventPtr watch) { - int32_t code = retry(boost::bind(&ZooKeeper::existsImpl, this, path, stat_, watch)); + int32_t code = retry(std::bind(&ZooKeeper::existsImpl, this, path, stat_, watch)); if (!( code == ZOK || code == ZNONODE)) @@ -360,7 +361,7 @@ std::string ZooKeeper::get(const std::string & path, Stat * stat, EventPtr watch bool ZooKeeper::tryGet(const std::string & path, std::string & res, Stat * stat_, EventPtr watch) { - int32_t code = retry(boost::bind(&ZooKeeper::getImpl, this, boost::ref(path), boost::ref(res), stat_, watch)); + int32_t code = retry(std::bind(&ZooKeeper::getImpl, this, std::ref(path), std::ref(res), stat_, watch)); if (!( code == ZOK || code == ZNONODE)) @@ -455,7 +456,7 @@ int32_t ZooKeeper::tryMulti(const Ops & ops_, OpResultsPtr * out_results_) int32_t ZooKeeper::tryMultiWithRetries(const Ops & ops, OpResultsPtr * out_results, size_t * attempt) { - int32_t code = retry(boost::bind(&ZooKeeper::multiImpl, this, boost::ref(ops), out_results), attempt); + int32_t code = retry(std::bind(&ZooKeeper::multiImpl, this, std::ref(ops), out_results), attempt); if (!(code == ZOK || code == ZNONODE || code == ZNODEEXISTS || diff --git a/utils/iotest/iotest.cpp b/utils/iotest/iotest.cpp index aeeea7ca617..b5d690c4b17 100644 --- a/utils/iotest/iotest.cpp +++ b/utils/iotest/iotest.cpp @@ -42,7 +42,7 @@ struct AlignedBuffer { int size; char * data; - + AlignedBuffer(int size_) { size_t page = sysconf(_SC_PAGESIZE); @@ -51,7 +51,7 @@ struct AlignedBuffer if (!data) throwFromErrno("memalign failed"); } - + ~AlignedBuffer() { free(data); @@ -63,15 +63,15 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block try { AlignedBuffer direct_buf(block_size); - + std::vector simple_buf(block_size); - + char * buf; if ((mode & MODE_DIRECT)) buf = direct_buf.data; else buf = &simple_buf[0]; - + drand48_data rand_data; timespec times; @@ -86,7 +86,7 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block lrand48_r(&rand_data, &rand_result1); lrand48_r(&rand_data, &rand_result2); lrand48_r(&rand_data, &rand_result3); - + for (size_t j = 0; j + 3 < block_size; j += 3) { long r; @@ -149,7 +149,7 @@ int mainImpl(int argc, char ** argv) block_size = Poco::NumberParser::parseUnsigned64(argv[5]); threads = Poco::NumberParser::parseUnsigned(argv[6]); count = Poco::NumberParser::parseUnsigned(argv[7]); - + for (int i = 0; argv[2][i]; ++i) { char c = argv[2][i]; @@ -174,7 +174,7 @@ int mainImpl(int argc, char ** argv) throw Poco::Exception("Invalid mode"); } } - + boost::threadpool::pool pool(threads); int fd = open(file_name, ((mode & MODE_READ) ? O_RDONLY : O_WRONLY) | ((mode & MODE_DIRECT) ? O_DIRECT : 0) | ((mode & MODE_SYNC) ? O_SYNC : 0)); @@ -185,11 +185,11 @@ int mainImpl(int argc, char ** argv) Exceptions exceptions(threads); Stopwatch watch; - + for (size_t i = 0; i < threads; ++i) - pool.schedule(boost::bind(thread, fd, mode, min_offset, max_offset, block_size, count, boost::ref(exceptions[i]))); + pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, count, std::ref(exceptions[i]))); pool.wait(); - + fsync(fd); for (size_t i = 0; i < threads; ++i) @@ -213,7 +213,7 @@ int mainImpl(int argc, char ** argv) << ", " << count * threads / watch.elapsedSeconds() << " ops/sec." << ", " << count * threads * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec." << std::endl; - + return 0; } diff --git a/utils/iotest/iotest_aio.cpp b/utils/iotest/iotest_aio.cpp index 0265b6fc618..44aba26209f 100644 --- a/utils/iotest/iotest_aio.cpp +++ b/utils/iotest/iotest_aio.cpp @@ -35,12 +35,12 @@ inline int io_setup(unsigned nr, aio_context_t *ctxp) return syscall(__NR_io_setup, nr, ctxp); } -inline int io_destroy(aio_context_t ctx) +inline int io_destroy(aio_context_t ctx) { return syscall(__NR_io_destroy, ctx); } -inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp) +inline int io_submit(aio_context_t ctx, long nr, struct iocb **iocbpp) { return syscall(__NR_io_submit, ctx, nr, iocbpp); } @@ -62,9 +62,9 @@ struct AlignedBuffer { int size = 0; char * data = nullptr; - + AlignedBuffer() {} - + void init(int size_) { uninit(); @@ -74,7 +74,7 @@ struct AlignedBuffer if (!data) throwFromErrno("memalign failed"); } - + void uninit() { if (data) @@ -82,12 +82,12 @@ struct AlignedBuffer data = nullptr; size = 0; } - + AlignedBuffer(int size_) : size(0), data(NULL) { init(size_); } - + ~AlignedBuffer() { uninit(); @@ -97,14 +97,14 @@ struct AlignedBuffer struct AioContext { aio_context_t ctx; - + AioContext() { ctx = 0; if (io_setup(128, &ctx) < 0) throwFromErrno("io_setup failed"); } - + ~AioContext() { io_destroy(ctx); @@ -119,25 +119,25 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block try { AioContext ctx; - + std::vector buffers(buffers_count); for (size_t i = 0; i < buffers_count; ++i) { buffers[i].init(block_size); } - + drand48_data rand_data; timespec times; clock_gettime(CLOCK_THREAD_CPUTIME_ID, ×); srand48_r(times.tv_nsec, &rand_data); - + size_t in_progress = 0; size_t blocks_sent = 0; std::vector buffer_used(buffers_count, false); std::vector iocbs(buffers_count); std::vector query_cbs; std::vector events(buffers_count); - + while (blocks_sent < count || in_progress > 0) { /// Составим запросы. @@ -146,16 +146,16 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block { if (blocks_sent >= count || in_progress >= buffers_count) break; - + if (buffer_used[i]) continue; - + buffer_used[i] = true; ++blocks_sent; ++in_progress; - + char * buf = buffers[i].data; - + for (size_t j = 0; j + 3 < block_size; j += 3) { long r; @@ -164,17 +164,17 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block buf[j + 1] = static_cast((r >> 8) & 255); buf[j + 2] = static_cast((r >> 16) & 255); } - + long rand_result1 = 0; long rand_result2 = 0; long rand_result3 = 0; lrand48_r(&rand_data, &rand_result1); lrand48_r(&rand_data, &rand_result2); lrand48_r(&rand_data, &rand_result3); - + size_t rand_result = rand_result1 ^ (rand_result2 << 22) ^ (rand_result3 << 43); size_t offset = min_offset + rand_result % ((max_offset - min_offset) / block_size) * block_size; - + iocb & cb = iocbs[i]; memset(&cb, 0, sizeof(cb)); cb.aio_buf = reinterpret_cast(buf); @@ -182,7 +182,7 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block cb.aio_nbytes = block_size; cb.aio_offset = offset; cb.aio_data = static_cast(i); - + if (mode == MODE_READ) { cb.aio_lio_opcode = IOCB_CMD_PREAD; @@ -191,20 +191,20 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block { cb.aio_lio_opcode = IOCB_CMD_PWRITE; } - + query_cbs.push_back(&cb); } - + /// Отправим запросы. if (io_submit(ctx.ctx, query_cbs.size(), &query_cbs[0]) < 0) throwFromErrno("io_submit failed"); - + /// Получим ответы. Если еще есть что отправлять, получим хотя бы один ответ (после этого пойдем отправлять), иначе дождемся всех ответов. memset(&events[0], 0, buffers_count * sizeof(events[0])); int evs = io_getevents(ctx.ctx, (blocks_sent < count ? 1 : in_progress), buffers_count, &events[0], nullptr); if (evs < 0) throwFromErrno("io_getevents failed"); - + for (int i = 0; i < evs; ++i) { int b = static_cast(events[i].data); @@ -214,7 +214,7 @@ void thread(int fd, int mode, size_t min_offset, size_t max_offset, size_t block buffer_used[b] = false; } } - + // iocb cb; // memset(&cb, 0, sizeof(cb)); // cb.aio_lio_opcode = IOCB_CMD_FSYNC; @@ -249,13 +249,13 @@ int mainImpl(int argc, char ** argv) size_t buffers_count = 0; size_t threads_count = 0; size_t count = 0; - + if (argc != 9) { std::cerr << "Usage: " << argv[0] << " file_name r|w min_offset max_offset block_size threads buffers count" << std::endl; return 1; } - + file_name = argv[1]; if (argv[2][0] == 'w') mode = MODE_WRITE; @@ -265,38 +265,38 @@ int mainImpl(int argc, char ** argv) threads_count = Poco::NumberParser::parseUnsigned(argv[6]); buffers_count = Poco::NumberParser::parseUnsigned(argv[7]); count = Poco::NumberParser::parseUnsigned(argv[8]); - + int fd = open(file_name, ((mode == MODE_READ) ? O_RDONLY : O_WRONLY) | O_DIRECT); if (-1 == fd) throwFromErrno("Cannot open file"); - + typedef std::vector Exceptions; - + boost::threadpool::pool pool(threads_count); Exceptions exceptions(threads_count); - + Stopwatch watch; - + for (size_t i = 0; i < threads_count; ++i) - pool.schedule(boost::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count, boost::ref(exceptions[i]))); + pool.schedule(std::bind(thread, fd, mode, min_offset, max_offset, block_size, buffers_count, count, std::ref(exceptions[i]))); pool.wait(); - + watch.stop(); - + for (size_t i = 0; i < threads_count; ++i) if (exceptions[i]) exceptions[i]->rethrow(); - + if (0 != close(fd)) throwFromErrno("Cannot close file"); - + std::cout << std::fixed << std::setprecision(2) << "Done " << count << " * " << threads_count << " ops"; std::cout << " in " << watch.elapsedSeconds() << " sec." << ", " << count * threads_count / watch.elapsedSeconds() << " ops/sec." << ", " << count * threads_count * block_size / watch.elapsedSeconds() / 1000000 << " MB/sec." << std::endl; - + return 0; }