#include #include #include #include //#define DBMS_HASH_MAP_DEBUG_RESIZES #include #include #include //#include //#include #include #include #include #include using Key = UInt64; using Value = UInt64; using Source = std::vector; template struct AggregateIndependent { template static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector> & results, Creator && creator, Updater && updater, ThreadPool & pool) { results.reserve(num_threads); for (size_t i = 0; i < num_threads; ++i) results.emplace_back(std::make_unique()); for (size_t i = 0; i < num_threads; ++i) { auto begin = data.begin() + (data.size() * i) / num_threads; auto end = data.begin() + (data.size() * (i + 1)) / num_threads; auto & map = *results[i]; pool.scheduleOrThrowOnError([&, begin, end]() { for (auto it = begin; it != end; ++it) { typename Map::LookupResult place; bool inserted; map.emplace(*it, place, inserted); if (inserted) creator(place->getMapped()); else updater(place->getMapped()); } }); } pool.wait(); } }; #if !defined(__clang__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" #endif template struct AggregateIndependentWithSequentialKeysOptimization { template static void NO_INLINE execute(const Source & data, size_t num_threads, std::vector> & results, Creator && creator, Updater && updater, ThreadPool & pool) { results.reserve(num_threads); for (size_t i = 0; i < num_threads; ++i) results.emplace_back(std::make_unique()); for (size_t i = 0; i < num_threads; ++i) { auto begin = data.begin() + (data.size() * i) / num_threads; auto end = data.begin() + (data.size() * (i + 1)) / num_threads; auto & map = *results[i]; pool.scheduleOrThrowOnError([&, begin, end]() { typename Map::LookupResult place = nullptr; Key prev_key {}; for (auto it = begin; it != end; ++it) { if (it != begin && *it == prev_key) { assert(place != nullptr); updater(place->getMapped()); continue; } prev_key = *it; bool inserted; map.emplace(*it, place, inserted); assert(place != nullptr); if (inserted) creator(place->getMapped()); else updater(place->getMapped()); } }); } pool.wait(); } }; #if !defined(__clang__) #pragma GCC diagnostic pop #endif template struct MergeSequential { template static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map, Merger && merger, ThreadPool &) { for (size_t i = 1; i < num_maps; ++i) { auto begin = source_maps[i]->begin(); auto end = source_maps[i]->end(); for (auto it = begin; it != end; ++it) merger((*source_maps[0])[it->getKey()], it->getMapped()); } result_map = source_maps[0]; } }; template struct MergeSequentialTransposed /// In practice not better than usual. { template static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map, Merger && merger, ThreadPool &) { std::vector iterators(num_maps); for (size_t i = 1; i < num_maps; ++i) iterators[i] = source_maps[i]->begin(); result_map = source_maps[0]; while (true) { bool finish = true; for (size_t i = 1; i < num_maps; ++i) { if (iterators[i] == source_maps[i]->end()) continue; finish = false; merger((*result_map)[iterators[i]->getKey()], iterators[i]->getMapped()); ++iterators[i]; } if (finish) break; } } }; template struct MergeParallelForTwoLevelTable { template static void NO_INLINE execute(Map ** source_maps, size_t num_maps, Map *& result_map, Merger && merger, ThreadPool & pool) { for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket) pool.scheduleOrThrowOnError([&, bucket, num_maps] { std::vector section(num_maps); for (size_t i = 0; i < num_maps; ++i) section[i] = &source_maps[i]->impls[bucket]; typename Map::Impl * res; ImplMerge::execute(section.data(), num_maps, res, merger, pool); }); pool.wait(); result_map = source_maps[0]; } }; template struct Work { template static void NO_INLINE execute(const Source & data, size_t num_threads, Creator && creator, Updater && updater, Merger && merger, ThreadPool & pool) { std::vector> intermediate_results; Stopwatch watch; Aggregate::execute(data, num_threads, intermediate_results, std::forward(creator), std::forward(updater), pool); size_t num_maps = intermediate_results.size(); watch.stop(); double time_aggregated = watch.elapsedSeconds(); std::cerr << "Aggregated in " << time_aggregated << " (" << data.size() / time_aggregated << " elem/sec.)" << std::endl; size_t size_before_merge = 0; std::cerr << "Sizes: "; for (size_t i = 0; i < num_threads; ++i) { std::cerr << (i == 0 ? "" : ", ") << intermediate_results[i]->size(); size_before_merge += intermediate_results[i]->size(); } std::cerr << std::endl; watch.restart(); std::vector intermediate_results_ptrs(num_maps); for (size_t i = 0; i < num_maps; ++i) intermediate_results_ptrs[i] = intermediate_results[i].get(); Map * result_map; Merge::execute(intermediate_results_ptrs.data(), num_maps, result_map, std::forward(merger), pool); watch.stop(); double time_merged = watch.elapsedSeconds(); std::cerr << "Merged in " << time_merged << " (" << size_before_merge / time_merged << " elem/sec.)" << std::endl; double time_total = time_aggregated + time_merged; std::cerr << "Total in " << time_total << " (" << data.size() / time_total << " elem/sec.)" << std::endl; std::cerr << "Size: " << result_map->size() << std::endl << std::endl; } }; using Map = HashMap>; using MapTwoLevel = TwoLevelHashMap>; using Mutex = std::mutex; struct Creator { void operator()(Value &) const {} }; #if !defined(__clang__) #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wmaybe-uninitialized" #endif struct Updater { void operator()(Value & x) const { ++x; } }; #if !defined(__clang__) #pragma GCC diagnostic pop #endif struct Merger { void operator()(Value & dst, const Value & src) const { dst += src; } }; int main(int argc, char ** argv) { size_t n = std::stol(argv[1]); size_t num_threads = std::stol(argv[2]); size_t method = argc <= 3 ? 0 : std::stol(argv[3]); std::cerr << std::fixed << std::setprecision(2); ThreadPool pool(num_threads); Source data(n); { Stopwatch watch; DB::ReadBufferFromFileDescriptor in1(STDIN_FILENO); DB::CompressedReadBuffer in2(in1); in2.readStrict(reinterpret_cast(data.data()), sizeof(data[0]) * n); watch.stop(); std::cerr << std::fixed << std::setprecision(2) << "Vector. Size: " << n << ", elapsed: " << watch.elapsedSeconds() << " (" << n / watch.elapsedSeconds() << " elem/sec.)" << std::endl << std::endl; } Creator creator; Updater updater; Merger merger; if (!method || method == 1) Work< Map, AggregateIndependent, MergeSequential >::execute(data, num_threads, creator, updater, merger, pool); if (!method || method == 2) Work< Map, AggregateIndependentWithSequentialKeysOptimization, MergeSequential >::execute(data, num_threads, creator, updater, merger, pool); if (!method || method == 3) Work< Map, AggregateIndependent, MergeSequentialTransposed >::execute(data, num_threads, creator, updater, merger, pool); if (!method || method == 4) Work< Map, AggregateIndependentWithSequentialKeysOptimization, MergeSequentialTransposed >::execute(data, num_threads, creator, updater, merger, pool); if (!method || method == 5) Work< MapTwoLevel, AggregateIndependent, MergeSequential >::execute(data, num_threads, creator, updater, merger, pool); if (!method || method == 6) Work< MapTwoLevel, AggregateIndependentWithSequentialKeysOptimization, MergeSequential >::execute(data, num_threads, creator, updater, merger, pool); if (!method || method == 7) Work< MapTwoLevel, AggregateIndependent, MergeSequentialTransposed >::execute(data, num_threads, creator, updater, merger, pool); if (!method || method == 8) Work< MapTwoLevel, AggregateIndependentWithSequentialKeysOptimization, MergeSequentialTransposed >::execute(data, num_threads, creator, updater, merger, pool); if (!method || method == 9) Work< MapTwoLevel, AggregateIndependent, MergeParallelForTwoLevelTable> >::execute(data, num_threads, creator, updater, merger, pool); if (!method || method == 10) Work< MapTwoLevel, AggregateIndependentWithSequentialKeysOptimization, MergeParallelForTwoLevelTable> >::execute(data, num_threads, creator, updater, merger, pool); if (!method || method == 13) Work< MapTwoLevel, AggregateIndependent, MergeParallelForTwoLevelTable> >::execute(data, num_threads, creator, updater, merger, pool); if (!method || method == 14) Work< MapTwoLevel, AggregateIndependentWithSequentialKeysOptimization, MergeParallelForTwoLevelTable> >::execute(data, num_threads, creator, updater, merger, pool); return 0; }