diff --git a/dbms/src/Interpreters/evaluateMissingDefaults.cpp b/dbms/src/Interpreters/evaluateMissingDefaults.cpp index c5cb36e3979..3b8ecd023d1 100644 --- a/dbms/src/Interpreters/evaluateMissingDefaults.cpp +++ b/dbms/src/Interpreters/evaluateMissingDefaults.cpp @@ -48,18 +48,10 @@ ASTPtr convertRequiredExpressions(Block & block, const NamesAndTypesList & requi { if (!block.has(required_column.name)) continue; - //throw Exception("Required conversion of column " + required_column.name + " which is absent in block. It's a bug", ErrorCodes::LOGICAL_ERROR); auto column_in_block = block.getByName(required_column.name); - //std::cerr << "Looking at:" << required_column.name << std::endl; - //std::cerr << "In block type:" << column_in_block.type->getName() << std::endl; - //std::cerr << "Required type:" << required_column.type->getName() << std::endl; if (column_in_block.type->equals(*required_column.type)) - { - //std::cerr << "TYPES ARE SAME\n"; continue; - } - //std::cerr << "TYPES ARE DIFFERENT\n"; auto cast_func = makeASTFunction( "CAST", std::make_shared(required_column.name), std::make_shared(required_column.type->getName())); @@ -114,9 +106,7 @@ void executeExpressionsOnBlock( copy_block.insert({DataTypeUInt8().createColumnConst(rows_was, 0u), std::make_shared(), "__dummy"}); } - //std::cerr << "Block before expression:" << copy_block.dumpStructure() << std::endl; expression_analyzer.getActions(true)->execute(copy_block); - //std::cerr << "Block after expression:" << copy_block.dumpStructure() << std::endl; /// move evaluated columns to the original block, materializing them at the same time size_t pos = 0; @@ -140,8 +130,6 @@ void executeExpressionsOnBlock( void performRequiredConversions(Block & block, const NamesAndTypesList & required_columns, const Context & context) { ASTPtr conversion_expr_list = convertRequiredExpressions(block, required_columns); - //std::cerr << queryToString(conversion_expr_list) << std::endl; - //std::cerr << "Block:" << block.dumpStructure() << std::endl; if (conversion_expr_list->children.empty()) return; executeExpressionsOnBlock(block, conversion_expr_list, true, required_columns, context); diff --git a/dbms/src/Interpreters/evaluateMissingDefaults.h b/dbms/src/Interpreters/evaluateMissingDefaults.h index 51db620c86f..18156388b97 100644 --- a/dbms/src/Interpreters/evaluateMissingDefaults.h +++ b/dbms/src/Interpreters/evaluateMissingDefaults.h @@ -17,6 +17,7 @@ void evaluateMissingDefaults(Block & block, const std::unordered_map & column_defaults, const Context & context, bool save_unneeded_columns = true); +/// Tries to convert columns in block to required_columns void performRequiredConversions(Block & block, const NamesAndTypesList & required_columns, const Context & context); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp index 964433fd385..74b9bda8992 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp @@ -56,7 +56,6 @@ MergeTreeReader::MergeTreeReader( , mmap_threshold(mmap_threshold_) , max_read_buffer_size(max_read_buffer_size_) { - ////std::cerr << "Merge tree reader created for part:" << data_part->name << std::endl; try { for (const NameAndTypePair & column_from_part : data_part->columns) @@ -66,22 +65,12 @@ MergeTreeReader::MergeTreeReader( for (const NameAndTypePair & column : columns) { - ////std::cerr << "Column name to read:" << column.name << std::endl; if (columns_from_part.count(column.name)) - { - ////std::cerr << "With type:" << columns_from_part[column.name]->getName() << std::endl; - ////std::cerr << "Original type:" << column.type->getName() << std::endl; - addStreams(column.name, *columns_from_part[column.name], profile_callback_, clock_type_); - } else - { - ////std::cerr << "Original type:" << column.type->getName() << std::endl; addStreams(column.name, *column.type, profile_callback_, clock_type_); - } } - ////std::cerr << "COLUMNS IN CONSTRUCTOR:" << columns.toString() << std::endl; } catch (...) { @@ -399,7 +388,7 @@ void MergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns size_t num_columns = columns.size(); if (res_columns.size() != num_columns) - throw Exception("invalid number of columns passed to MergeTreeReader::evaluateMissingDefaults. " + throw Exception("Invalid number of columns passed to MergeTreeReader::evaluateMissingDefaults. " "Expected " + toString(num_columns) + ", " "got " + toString(res_columns.size()), ErrorCodes::LOGICAL_ERROR); @@ -414,9 +403,7 @@ void MergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name}); } - //std::cerr << "additional columns before:" << additional_columns.dumpStructure() << std::endl; DB::evaluateMissingDefaults(additional_columns, columns, storage.getColumns().getDefaults(), storage.global_context); - //std::cerr << "additional columns after:" << additional_columns.dumpStructure() << std::endl; /// Move columns from block. name_and_type = columns.begin(); @@ -440,7 +427,7 @@ void MergeTreeReader::performRequiredConversions(Columns & res_columns) if (res_columns.size() != num_columns) { throw Exception( - "invalid number of columns passed to MergeTreeReader::performRequiredConversions. " + "Invalid number of columns passed to MergeTreeReader::performRequiredConversions. " "Expected " + toString(num_columns) + ", " @@ -451,32 +438,19 @@ void MergeTreeReader::performRequiredConversions(Columns & res_columns) Block copy_block; auto name_and_type = columns.begin(); - ////std::cerr << "DATAPART NAMES AND TYPES:" << data_part->columns.toString() << std::endl; - ////std::cerr << "REQUIRED COLUMNS NAMES AND TYPES:" << columns.toString() << std::endl; - ////std::cerr << "RES COLUMNS SIZE:" << res_columns.size() << std::endl; - ////std::cerr << "RES COLUMNS STRUCTURE:\n"; - //for (const auto & column : res_columns) - //{ - // //std::cerr << column->dumpStructure() << std::endl; - //} for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type) { - ////std::cerr << "POS:" << pos << std::endl; if (res_columns[pos] == nullptr) continue; - ////std::cerr << "POS NAME:" << name_and_type->name << std::endl; - ////std::cerr << "POS TYPE:" << name_and_type->type->getName() << std::endl; if (columns_from_part.count(name_and_type->name)) copy_block.insert({res_columns[pos], columns_from_part[name_and_type->name], name_and_type->name}); else copy_block.insert({res_columns[pos], name_and_type->type, name_and_type->name}); } - ////std::cerr << "Copy block: " << copy_block.dumpStructure() << std::endl; DB::performRequiredConversions(copy_block, columns, storage.global_context); - ////std::cerr << "Result copy block: " << copy_block.dumpStructure() << std::endl; /// Move columns from block. name_and_type = columns.begin(); diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.h b/dbms/src/Storages/MergeTree/MergeTreeReader.h index d0562fe3300..5352b5a2073 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.h +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.h @@ -45,7 +45,8 @@ public: /// Evaluate defaulted columns if necessary. void evaluateMissingDefaults(Block additional_columns, Columns & res_columns); - /// Perform conversions TODO(alesap) + /// If part metadata is not equal to storage metadata, than + /// try to perform conversions of columns. void performRequiredConversions(Columns & res_columns); const NamesAndTypesList & getColumns() const { return columns; } diff --git a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h index d8c917cac6c..cef9cb28e85 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h +++ b/dbms/src/Storages/MergeTree/ReplicatedQueueAlterChain.h @@ -44,7 +44,7 @@ public: void addMutationForAlter(int alter_version, std::lock_guard & /*state_lock*/) { if (!queue_state.count(alter_version)) - queue_state.emplace(alter_version, AlterState{true, false}); + queue_state.emplace(alter_version, AlterState{false, false}); else queue_state[alter_version].data_finished = false; } @@ -73,17 +73,19 @@ public: /// queue can be empty after load of finished mutation without move of mutation pointer if (queue_state.empty()) return; - assert(queue_state.count(alter_version)); - if (queue_state[alter_version].metadata_finished) - queue_state.erase(alter_version); - else - queue_state[alter_version].data_finished = true; + if (alter_version >= queue_state.begin()->first) + { + assert(queue_state.count(alter_version)); + if (queue_state[alter_version].metadata_finished) + queue_state.erase(alter_version); + else + queue_state[alter_version].data_finished = true; + } } bool canExecuteDataAlter(int alter_version, std::lock_guard & /*state_lock*/) const { - if (!queue_state.count(alter_version)) return true; diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 3e8ff411ecd..bf52618e727 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -302,7 +302,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree( createNewZooKeeperNodes(); Coordination::Stat metadata_stat; - current_zookeeper->get(zookeeper_path + "/columns", &metadata_stat); + current_zookeeper->get(zookeeper_path + "/metadata", &metadata_stat); metadata_version = metadata_stat.version; other_replicas_fixed_granularity = checkFixedGranualrityInZookeeper(); @@ -474,11 +474,9 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo String metadata_str = zookeeper->get(zookeeper_path + "/metadata", &metadata_stat); auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str); auto metadata_diff = old_metadata.checkAndFindDiff(metadata_from_zk, allow_alter); - //metadata_version = metadata_stat.version; Coordination::Stat columns_stat; auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(zookeeper_path + "/columns", &columns_stat)); - //columns_version = columns_stat.version; /// TODO(alesap) remove this trash const ColumnsDescription & old_columns = getColumns(); @@ -1438,7 +1436,8 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry) try { - if (!fetchPart(entry.actual_new_part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum)) + String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name; + if (!fetchPart(part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum)) return false; } catch (Exception & e) @@ -2618,10 +2617,6 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entr return {}; } } - else - { - entry.actual_new_part_name = entry.new_part_name; - } return replica; } @@ -3436,21 +3431,30 @@ void StorageReplicatedMergeTree::alter( } } - LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); table_lock_holder.release(); std::vector unwaited; if (query_context.getSettingsRef().replication_alter_partitions_sync == 2) + { + LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); unwaited = waitForAllReplicasToProcessLogEntry(*alter_entry, false); + } else if (query_context.getSettingsRef().replication_alter_partitions_sync == 1) + { + LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); waitForReplicaToProcessLogEntry(replica_name, *alter_entry); + } if (!unwaited.empty()) throw Exception("Some replicas doesn't finish metadata alter", ErrorCodes::UNFINISHED); if (mutation_znode) + { + LOG_DEBUG(log, "Metadata changes applied. Will wait for data changes."); waitMutation(*mutation_znode, query_context.getSettingsRef().replication_alter_partitions_sync); + LOG_DEBUG(log, "Data changes applied."); + } } void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) diff --git a/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh b/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh index c5488a1d3e6..747d394681d 100755 --- a/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh +++ b/dbms/tests/queries/0_stateless/01079_parallel_alter_modify_zookeeper.sh @@ -10,11 +10,11 @@ for i in `seq $REPLICAS`; do done for i in `seq $REPLICAS`; do - $CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_mt_$i (key UInt64, value1 UInt64, value2 String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_alter_mt', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0" + $CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_mt_$i (key UInt64, value1 UInt64, value2 Int32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_alter_mt', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000" done -$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10)" -$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10, 40)" +$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, number from numbers(10)" +$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, number from numbers(10, 40)" for i in `seq $REPLICAS`; do $CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i" @@ -26,17 +26,6 @@ done INITIAL_SUM=`$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_1"` -# This is just garbage thread with conflictings alter -# it additionally loads alters "queue". -function garbage_alter_thread() -{ - while true; do - REPLICA=$(($RANDOM % 5 + 1)) - $CLICKHOUSE_CLIENT -n --query "ALTER TABLE concurrent_alter_mt_$REPLICA ADD COLUMN h String DEFAULT '0'; ALTER TABLE concurrent_alter_mt_$REPLICA MODIFY COLUMN h UInt64; ALTER TABLE concurrent_alter_mt_$REPLICA DROP COLUMN h;"; - done -} - - # This alters mostly requires not only metadata change # but also conversion of data. Also they are all compatible # between each other, so can be executed concurrently. @@ -61,26 +50,42 @@ function insert_thread() while true; do REPLICA=$(($RANDOM % 5 + 1)) VALUE=${VALUES[$RANDOM % ${#VALUES[@]} ]} - $CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_$REPLICA VALUES($RANDOM, $VALUE, toString($VALUE))" + $CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_$REPLICA VALUES($RANDOM, $VALUE, $VALUE)" + sleep 0.$RANDOM + done +} + +# Some select load, to be sure, that our selects work in concurrent execution with alters +function select_thread() +{ + while true; do + REPLICA=$(($RANDOM % 5 + 1)) + $CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) FROM concurrent_alter_mt_$REPLICA" 1>/dev/null sleep 0.$RANDOM done } - echo "Starting alters" -export -f garbage_alter_thread; export -f correct_alter_thread; export -f insert_thread; +export -f select_thread; -TIMEOUT=500 +TIMEOUT=60 + + +# Selects should run successfully +timeout $TIMEOUT bash -c select_thread & +timeout $TIMEOUT bash -c select_thread & +timeout $TIMEOUT bash -c select_thread & timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null & timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null & timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null & + timeout $TIMEOUT bash -c insert_thread 2> /dev/null & timeout $TIMEOUT bash -c insert_thread 2> /dev/null & timeout $TIMEOUT bash -c insert_thread 2> /dev/null & @@ -95,16 +100,14 @@ wait echo "Finishing alters" -# This alter will finish all previous -$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>/dev/null -while [ $? -ne 0 ]; do - $CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>/dev/null +# This alter will finish all previous, but replica 1 maybe still not up-to-date +while [[ $($CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do + sleep 1 done - for i in `seq $REPLICAS`; do $CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i" $CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_mt_$i" $CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done=0" # all mutations have to be done - $CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i" + #$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i" done