diff --git a/dbms/src/Common/ZooKeeper/tests/zkutil_expiration_test.cpp b/dbms/src/Common/ZooKeeper/tests/zkutil_expiration_test.cpp index 54648bdad8f..073e482e109 100644 --- a/dbms/src/Common/ZooKeeper/tests/zkutil_expiration_test.cpp +++ b/dbms/src/Common/ZooKeeper/tests/zkutil_expiration_test.cpp @@ -38,7 +38,7 @@ int main(int argc, char ** argv) ops.emplace_back(std::make_shared("/test/zk_expiration_test", -1)); zkutil::MultiTransactionInfo info; - zk.tryMultiUnsafe(ops, info); + zk.tryMultiNoThrow(ops, nullptr, &info); std::cout << time(nullptr) - time0 << "s: " << zkutil::ZooKeeper::error2string(info.code) << std::endl; try diff --git a/dbms/src/Server/ClusterCopier.cpp b/dbms/src/Server/ClusterCopier.cpp index 077dc43c5f6..4233ce70e3f 100644 --- a/dbms/src/Server/ClusterCopier.cpp +++ b/dbms/src/Server/ClusterCopier.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -37,7 +38,7 @@ #include #include #include -#include +#include #include #include #include @@ -46,20 +47,20 @@ #include #include #include -#include #include #include -#include +#include #include -#include #include #include #include #include #include #include -#include #include +#include +#include +#include #include #include @@ -697,7 +698,7 @@ void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfiguration & c settings_pull.load_balancing = LoadBalancing::NEAREST_HOSTNAME; settings_pull.readonly = 1; settings_pull.max_threads = 1; - settings_pull.max_block_size = std::min(8192UL, settings_pull.max_block_size.value); + settings_pull.max_block_size = settings_pull.max_block_size.changed ? settings_pull.max_block_size.value : 8192UL; settings_pull.preferred_block_size_bytes = 0; settings_push.insert_distributed_timeout = 0; @@ -759,6 +760,7 @@ public: /// Do not initialize tables, will make deferred initialization in process() + getZooKeeper()->createAncestors(getWorkersPathVersion() + "/"); getZooKeeper()->createAncestors(getWorkersPath() + "/"); } @@ -1011,32 +1013,65 @@ protected: return task_cluster->task_zookeeper_path + "/task_active_workers"; } + String getWorkersPathVersion() const + { + return getWorkersPath() + "_version"; + } + String getCurrentWorkerNodePath() const { return getWorkersPath() + "/" + host_id; } zkutil::EphemeralNodeHolder::Ptr createTaskWorkerNodeAndWaitIfNeed(const zkutil::ZooKeeperPtr & zookeeper, - const String & description) + const String & description, bool unprioritized) { + std::chrono::milliseconds current_sleep_time = default_sleep_time; + static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec + + if (unprioritized) + std::this_thread::sleep_for(current_sleep_time); + + String workers_version_path = getWorkersPathVersion(); + String workers_path = getWorkersPath(); + String current_worker_path = getCurrentWorkerNodePath(); + while (true) { zkutil::Stat stat; - zookeeper->get(getWorkersPath(), &stat); + zookeeper->get(workers_version_path, &stat); + auto version = stat.version; + zookeeper->get(workers_path, &stat); if (static_cast(stat.numChildren) >= task_cluster->max_workers) { LOG_DEBUG(log, "Too many workers (" << stat.numChildren << ", maximum " << task_cluster->max_workers << ")" << ". Postpone processing " << description); - - std::this_thread::sleep_for(default_sleep_time); - - updateConfigIfNeeded(); } else { - return std::make_shared(getCurrentWorkerNodePath(), *zookeeper, true, false, description); + zkutil::Ops ops; + ops.emplace_back(new zkutil::Op::SetData(workers_version_path, description, version)); + ops.emplace_back(new zkutil::Op::Create(current_worker_path, description, zookeeper->getDefaultACL(), zkutil::CreateMode::Ephemeral)); + auto code = zookeeper->tryMulti(ops); + + if (code == ZOK || code == ZNODEEXISTS) + return std::make_shared(current_worker_path, *zookeeper, false, false, description); + + if (code == ZBADVERSION) + { + LOG_DEBUG(log, "A concurrent worker has just been added, will check free worker slots again"); + } + else + throw zkutil::KeeperException(code); } + + if (unprioritized) + current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time); + + std::this_thread::sleep_for(current_sleep_time); + + updateConfigIfNeeded(); } } @@ -1444,9 +1479,8 @@ protected: return parseQuery(p_query, query); }; - /// Load balancing - auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path); + auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_status_path, task_shard.priority.is_remote); LOG_DEBUG(log, "Processing " << current_task_status_path); @@ -1609,8 +1643,15 @@ protected: Context context_insert = context; context_insert.getSettingsRef() = task_cluster->settings_push; - BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute(); - BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute(); + BlockInputStreamPtr input; + BlockOutputStreamPtr output; + { + BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute(); + BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute(); + + input = std::make_shared(io_select.in); + output = io_insert.out; + } using ExistsFuture = zkutil::ZooKeeper::ExistsFuture; auto future_is_dirty_checker = std::make_unique(zookeeper->asyncExists(is_dirty_flag_path)); @@ -1665,7 +1706,7 @@ protected: }; /// Main work is here - copyData(*io_select.in, *io_insert.out, cancel_check, update_stats); + copyData(*input, *output, cancel_check, update_stats); // Just in case if (future_is_dirty_checker != nullptr) diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp index 8db08dfb09a..607eb9efb67 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -337,7 +337,10 @@ void DistributedBlockOutputStream::writeSuffix() throw; } - LOG_DEBUG(log, getCurrentStateDescription()); + double elapsed = watch.elapsedSeconds(); + LOG_DEBUG(log, "It took " << std::fixed << std::setprecision(1) << elapsed << " sec. to insert " << blocks_inserted << " blocks" + << " (average " << std::fixed << std::setprecision(1) << elapsed / blocks_inserted * 1000 << " ms. per block)" + << ". " << getCurrentStateDescription()); } } diff --git a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h index 199700d742c..eed38adf417 100644 --- a/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h +++ b/dbms/src/Storages/Distributed/DistributedBlockOutputStream.h @@ -118,7 +118,7 @@ private: size_t remote_jobs_count = 0; size_t local_jobs_count = 0; - + std::atomic finished_jobs_count{0}; Poco::Logger * log; diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp index 229c40404bb..ae8677c57be 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMerger.cpp @@ -330,12 +330,12 @@ static void extractMergingAndGatheringColumns(const NamesAndTypesList & all_colu NamesAndTypesList & merging_columns, Names & merging_column_names ) { - Names primary_key_columns_dup = primary_key_expressions->getRequiredColumns(); - std::set key_columns(primary_key_columns_dup.cbegin(), primary_key_columns_dup.cend()); + Names primary_key_columns_vec = primary_key_expressions->getRequiredColumns(); + std::set key_columns(primary_key_columns_vec.cbegin(), primary_key_columns_vec.cend()); if (secondary_key_expressions) { - Names secondary_key_columns_dup = secondary_key_expressions->getRequiredColumns(); - key_columns.insert(secondary_key_columns_dup.begin(), secondary_key_columns_dup.end()); + Names secondary_key_columns_vec = secondary_key_expressions->getRequiredColumns(); + key_columns.insert(secondary_key_columns_vec.begin(), secondary_key_columns_vec.end()); } /// Force sign column for Collapsing mode @@ -350,6 +350,10 @@ static void extractMergingAndGatheringColumns(const NamesAndTypesList & all_colu if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing) key_columns.emplace(merging_params.sign_column); + /// Force to merge at least one column in case of empty key + if (key_columns.empty()) + key_columns.emplace(all_columns.front().name); + /// TODO: also force "summing" and "aggregating" columns to make Horizontal merge only for such columns for (auto & column : all_columns) diff --git a/dbms/tests/integration/test_cluster_copier/task0_description.xml b/dbms/tests/integration/test_cluster_copier/task0_description.xml index 0031a18c552..e8e4df99254 100644 --- a/dbms/tests/integration/test_cluster_copier/task0_description.xml +++ b/dbms/tests/integration/test_cluster_copier/task0_description.xml @@ -1,7 +1,7 @@ - 4 + 3 diff --git a/dbms/tests/queries/0_stateless/00578_merge_trees_without_primary_key.sql b/dbms/tests/queries/0_stateless/00578_merge_trees_without_primary_key.sql index efd881ae006..2e1c0d24024 100644 --- a/dbms/tests/queries/0_stateless/00578_merge_trees_without_primary_key.sql +++ b/dbms/tests/queries/0_stateless/00578_merge_trees_without_primary_key.sql @@ -1,7 +1,7 @@ SELECT '*** MergeTree ***'; DROP TABLE IF EXISTS test.unsorted; -CREATE TABLE test.unsorted (x UInt32, y String) ENGINE MergeTree ORDER BY tuple(); +CREATE TABLE test.unsorted (x UInt32, y String) ENGINE MergeTree ORDER BY tuple() SETTINGS vertical_merge_algorithm_min_rows_to_activate=0, vertical_merge_algorithm_min_columns_to_activate=0; INSERT INTO test.unsorted VALUES (1, 'a'), (5, 'b'); INSERT INTO test.unsorted VALUES (2, 'c'), (4, 'd');