mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Better naming
This commit is contained in:
parent
78d42142cf
commit
199c22c363
@ -48,18 +48,10 @@ ASTPtr convertRequiredExpressions(Block & block, const NamesAndTypesList & requi
|
|||||||
{
|
{
|
||||||
if (!block.has(required_column.name))
|
if (!block.has(required_column.name))
|
||||||
continue;
|
continue;
|
||||||
//throw Exception("Required conversion of column " + required_column.name + " which is absent in block. It's a bug", ErrorCodes::LOGICAL_ERROR);
|
|
||||||
|
|
||||||
auto column_in_block = block.getByName(required_column.name);
|
auto column_in_block = block.getByName(required_column.name);
|
||||||
//std::cerr << "Looking at:" << required_column.name << std::endl;
|
|
||||||
//std::cerr << "In block type:" << column_in_block.type->getName() << std::endl;
|
|
||||||
//std::cerr << "Required type:" << required_column.type->getName() << std::endl;
|
|
||||||
if (column_in_block.type->equals(*required_column.type))
|
if (column_in_block.type->equals(*required_column.type))
|
||||||
{
|
|
||||||
//std::cerr << "TYPES ARE SAME\n";
|
|
||||||
continue;
|
continue;
|
||||||
}
|
|
||||||
//std::cerr << "TYPES ARE DIFFERENT\n";
|
|
||||||
|
|
||||||
auto cast_func = makeASTFunction(
|
auto cast_func = makeASTFunction(
|
||||||
"CAST", std::make_shared<ASTIdentifier>(required_column.name), std::make_shared<ASTLiteral>(required_column.type->getName()));
|
"CAST", std::make_shared<ASTIdentifier>(required_column.name), std::make_shared<ASTLiteral>(required_column.type->getName()));
|
||||||
@ -114,9 +106,7 @@ void executeExpressionsOnBlock(
|
|||||||
copy_block.insert({DataTypeUInt8().createColumnConst(rows_was, 0u), std::make_shared<DataTypeUInt8>(), "__dummy"});
|
copy_block.insert({DataTypeUInt8().createColumnConst(rows_was, 0u), std::make_shared<DataTypeUInt8>(), "__dummy"});
|
||||||
}
|
}
|
||||||
|
|
||||||
//std::cerr << "Block before expression:" << copy_block.dumpStructure() << std::endl;
|
|
||||||
expression_analyzer.getActions(true)->execute(copy_block);
|
expression_analyzer.getActions(true)->execute(copy_block);
|
||||||
//std::cerr << "Block after expression:" << copy_block.dumpStructure() << std::endl;
|
|
||||||
|
|
||||||
/// move evaluated columns to the original block, materializing them at the same time
|
/// move evaluated columns to the original block, materializing them at the same time
|
||||||
size_t pos = 0;
|
size_t pos = 0;
|
||||||
@ -140,8 +130,6 @@ void executeExpressionsOnBlock(
|
|||||||
void performRequiredConversions(Block & block, const NamesAndTypesList & required_columns, const Context & context)
|
void performRequiredConversions(Block & block, const NamesAndTypesList & required_columns, const Context & context)
|
||||||
{
|
{
|
||||||
ASTPtr conversion_expr_list = convertRequiredExpressions(block, required_columns);
|
ASTPtr conversion_expr_list = convertRequiredExpressions(block, required_columns);
|
||||||
//std::cerr << queryToString(conversion_expr_list) << std::endl;
|
|
||||||
//std::cerr << "Block:" << block.dumpStructure() << std::endl;
|
|
||||||
if (conversion_expr_list->children.empty())
|
if (conversion_expr_list->children.empty())
|
||||||
return;
|
return;
|
||||||
executeExpressionsOnBlock(block, conversion_expr_list, true, required_columns, context);
|
executeExpressionsOnBlock(block, conversion_expr_list, true, required_columns, context);
|
||||||
|
@ -17,6 +17,7 @@ void evaluateMissingDefaults(Block & block,
|
|||||||
const std::unordered_map<std::string, ColumnDefault> & column_defaults,
|
const std::unordered_map<std::string, ColumnDefault> & column_defaults,
|
||||||
const Context & context, bool save_unneeded_columns = true);
|
const Context & context, bool save_unneeded_columns = true);
|
||||||
|
|
||||||
|
/// Tries to convert columns in block to required_columns
|
||||||
void performRequiredConversions(Block & block,
|
void performRequiredConversions(Block & block,
|
||||||
const NamesAndTypesList & required_columns,
|
const NamesAndTypesList & required_columns,
|
||||||
const Context & context);
|
const Context & context);
|
||||||
|
@ -56,7 +56,6 @@ MergeTreeReader::MergeTreeReader(
|
|||||||
, mmap_threshold(mmap_threshold_)
|
, mmap_threshold(mmap_threshold_)
|
||||||
, max_read_buffer_size(max_read_buffer_size_)
|
, max_read_buffer_size(max_read_buffer_size_)
|
||||||
{
|
{
|
||||||
////std::cerr << "Merge tree reader created for part:" << data_part->name << std::endl;
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
for (const NameAndTypePair & column_from_part : data_part->columns)
|
for (const NameAndTypePair & column_from_part : data_part->columns)
|
||||||
@ -66,22 +65,12 @@ MergeTreeReader::MergeTreeReader(
|
|||||||
|
|
||||||
for (const NameAndTypePair & column : columns)
|
for (const NameAndTypePair & column : columns)
|
||||||
{
|
{
|
||||||
////std::cerr << "Column name to read:" << column.name << std::endl;
|
|
||||||
if (columns_from_part.count(column.name))
|
if (columns_from_part.count(column.name))
|
||||||
{
|
|
||||||
////std::cerr << "With type:" << columns_from_part[column.name]->getName() << std::endl;
|
|
||||||
////std::cerr << "Original type:" << column.type->getName() << std::endl;
|
|
||||||
|
|
||||||
addStreams(column.name, *columns_from_part[column.name], profile_callback_, clock_type_);
|
addStreams(column.name, *columns_from_part[column.name], profile_callback_, clock_type_);
|
||||||
}
|
|
||||||
else
|
else
|
||||||
{
|
|
||||||
////std::cerr << "Original type:" << column.type->getName() << std::endl;
|
|
||||||
addStreams(column.name, *column.type, profile_callback_, clock_type_);
|
addStreams(column.name, *column.type, profile_callback_, clock_type_);
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
////std::cerr << "COLUMNS IN CONSTRUCTOR:" << columns.toString() << std::endl;
|
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
@ -399,7 +388,7 @@ void MergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
|
|||||||
size_t num_columns = columns.size();
|
size_t num_columns = columns.size();
|
||||||
|
|
||||||
if (res_columns.size() != num_columns)
|
if (res_columns.size() != num_columns)
|
||||||
throw Exception("invalid number of columns passed to MergeTreeReader::evaluateMissingDefaults. "
|
throw Exception("Invalid number of columns passed to MergeTreeReader::evaluateMissingDefaults. "
|
||||||
"Expected " + toString(num_columns) + ", "
|
"Expected " + toString(num_columns) + ", "
|
||||||
"got " + toString(res_columns.size()), ErrorCodes::LOGICAL_ERROR);
|
"got " + toString(res_columns.size()), ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
@ -414,9 +403,7 @@ void MergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
|
|||||||
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
|
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
|
||||||
}
|
}
|
||||||
|
|
||||||
//std::cerr << "additional columns before:" << additional_columns.dumpStructure() << std::endl;
|
|
||||||
DB::evaluateMissingDefaults(additional_columns, columns, storage.getColumns().getDefaults(), storage.global_context);
|
DB::evaluateMissingDefaults(additional_columns, columns, storage.getColumns().getDefaults(), storage.global_context);
|
||||||
//std::cerr << "additional columns after:" << additional_columns.dumpStructure() << std::endl;
|
|
||||||
|
|
||||||
/// Move columns from block.
|
/// Move columns from block.
|
||||||
name_and_type = columns.begin();
|
name_and_type = columns.begin();
|
||||||
@ -440,7 +427,7 @@ void MergeTreeReader::performRequiredConversions(Columns & res_columns)
|
|||||||
if (res_columns.size() != num_columns)
|
if (res_columns.size() != num_columns)
|
||||||
{
|
{
|
||||||
throw Exception(
|
throw Exception(
|
||||||
"invalid number of columns passed to MergeTreeReader::performRequiredConversions. "
|
"Invalid number of columns passed to MergeTreeReader::performRequiredConversions. "
|
||||||
"Expected "
|
"Expected "
|
||||||
+ toString(num_columns)
|
+ toString(num_columns)
|
||||||
+ ", "
|
+ ", "
|
||||||
@ -451,32 +438,19 @@ void MergeTreeReader::performRequiredConversions(Columns & res_columns)
|
|||||||
|
|
||||||
Block copy_block;
|
Block copy_block;
|
||||||
auto name_and_type = columns.begin();
|
auto name_and_type = columns.begin();
|
||||||
////std::cerr << "DATAPART NAMES AND TYPES:" << data_part->columns.toString() << std::endl;
|
|
||||||
////std::cerr << "REQUIRED COLUMNS NAMES AND TYPES:" << columns.toString() << std::endl;
|
|
||||||
////std::cerr << "RES COLUMNS SIZE:" << res_columns.size() << std::endl;
|
|
||||||
////std::cerr << "RES COLUMNS STRUCTURE:\n";
|
|
||||||
//for (const auto & column : res_columns)
|
|
||||||
//{
|
|
||||||
// //std::cerr << column->dumpStructure() << std::endl;
|
|
||||||
//}
|
|
||||||
|
|
||||||
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
|
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
|
||||||
{
|
{
|
||||||
////std::cerr << "POS:" << pos << std::endl;
|
|
||||||
if (res_columns[pos] == nullptr)
|
if (res_columns[pos] == nullptr)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
////std::cerr << "POS NAME:" << name_and_type->name << std::endl;
|
|
||||||
////std::cerr << "POS TYPE:" << name_and_type->type->getName() << std::endl;
|
|
||||||
if (columns_from_part.count(name_and_type->name))
|
if (columns_from_part.count(name_and_type->name))
|
||||||
copy_block.insert({res_columns[pos], columns_from_part[name_and_type->name], name_and_type->name});
|
copy_block.insert({res_columns[pos], columns_from_part[name_and_type->name], name_and_type->name});
|
||||||
else
|
else
|
||||||
copy_block.insert({res_columns[pos], name_and_type->type, name_and_type->name});
|
copy_block.insert({res_columns[pos], name_and_type->type, name_and_type->name});
|
||||||
}
|
}
|
||||||
|
|
||||||
////std::cerr << "Copy block: " << copy_block.dumpStructure() << std::endl;
|
|
||||||
DB::performRequiredConversions(copy_block, columns, storage.global_context);
|
DB::performRequiredConversions(copy_block, columns, storage.global_context);
|
||||||
////std::cerr << "Result copy block: " << copy_block.dumpStructure() << std::endl;
|
|
||||||
|
|
||||||
/// Move columns from block.
|
/// Move columns from block.
|
||||||
name_and_type = columns.begin();
|
name_and_type = columns.begin();
|
||||||
|
@ -45,7 +45,8 @@ public:
|
|||||||
/// Evaluate defaulted columns if necessary.
|
/// Evaluate defaulted columns if necessary.
|
||||||
void evaluateMissingDefaults(Block additional_columns, Columns & res_columns);
|
void evaluateMissingDefaults(Block additional_columns, Columns & res_columns);
|
||||||
|
|
||||||
/// Perform conversions TODO(alesap)
|
/// If part metadata is not equal to storage metadata, than
|
||||||
|
/// try to perform conversions of columns.
|
||||||
void performRequiredConversions(Columns & res_columns);
|
void performRequiredConversions(Columns & res_columns);
|
||||||
|
|
||||||
const NamesAndTypesList & getColumns() const { return columns; }
|
const NamesAndTypesList & getColumns() const { return columns; }
|
||||||
|
@ -44,7 +44,7 @@ public:
|
|||||||
void addMutationForAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
|
void addMutationForAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
|
||||||
{
|
{
|
||||||
if (!queue_state.count(alter_version))
|
if (!queue_state.count(alter_version))
|
||||||
queue_state.emplace(alter_version, AlterState{true, false});
|
queue_state.emplace(alter_version, AlterState{false, false});
|
||||||
else
|
else
|
||||||
queue_state[alter_version].data_finished = false;
|
queue_state[alter_version].data_finished = false;
|
||||||
}
|
}
|
||||||
@ -73,17 +73,19 @@ public:
|
|||||||
/// queue can be empty after load of finished mutation without move of mutation pointer
|
/// queue can be empty after load of finished mutation without move of mutation pointer
|
||||||
if (queue_state.empty())
|
if (queue_state.empty())
|
||||||
return;
|
return;
|
||||||
assert(queue_state.count(alter_version));
|
|
||||||
|
|
||||||
|
if (alter_version >= queue_state.begin()->first)
|
||||||
|
{
|
||||||
|
assert(queue_state.count(alter_version));
|
||||||
if (queue_state[alter_version].metadata_finished)
|
if (queue_state[alter_version].metadata_finished)
|
||||||
queue_state.erase(alter_version);
|
queue_state.erase(alter_version);
|
||||||
else
|
else
|
||||||
queue_state[alter_version].data_finished = true;
|
queue_state[alter_version].data_finished = true;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool canExecuteDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
|
bool canExecuteDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
|
||||||
{
|
{
|
||||||
|
|
||||||
if (!queue_state.count(alter_version))
|
if (!queue_state.count(alter_version))
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
|
@ -302,7 +302,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
|
|||||||
createNewZooKeeperNodes();
|
createNewZooKeeperNodes();
|
||||||
|
|
||||||
Coordination::Stat metadata_stat;
|
Coordination::Stat metadata_stat;
|
||||||
current_zookeeper->get(zookeeper_path + "/columns", &metadata_stat);
|
current_zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
|
||||||
metadata_version = metadata_stat.version;
|
metadata_version = metadata_stat.version;
|
||||||
|
|
||||||
other_replicas_fixed_granularity = checkFixedGranualrityInZookeeper();
|
other_replicas_fixed_granularity = checkFixedGranualrityInZookeeper();
|
||||||
@ -474,11 +474,9 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo
|
|||||||
String metadata_str = zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
|
String metadata_str = zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
|
||||||
auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
|
auto metadata_from_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
|
||||||
auto metadata_diff = old_metadata.checkAndFindDiff(metadata_from_zk, allow_alter);
|
auto metadata_diff = old_metadata.checkAndFindDiff(metadata_from_zk, allow_alter);
|
||||||
//metadata_version = metadata_stat.version;
|
|
||||||
|
|
||||||
Coordination::Stat columns_stat;
|
Coordination::Stat columns_stat;
|
||||||
auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(zookeeper_path + "/columns", &columns_stat));
|
auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(zookeeper_path + "/columns", &columns_stat));
|
||||||
//columns_version = columns_stat.version;
|
|
||||||
|
|
||||||
/// TODO(alesap) remove this trash
|
/// TODO(alesap) remove this trash
|
||||||
const ColumnsDescription & old_columns = getColumns();
|
const ColumnsDescription & old_columns = getColumns();
|
||||||
@ -1438,7 +1436,8 @@ bool StorageReplicatedMergeTree::executeFetch(LogEntry & entry)
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (!fetchPart(entry.actual_new_part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
|
String part_name = entry.actual_new_part_name.empty() ? entry.new_part_name : entry.actual_new_part_name;
|
||||||
|
if (!fetchPart(part_name, zookeeper_path + "/replicas/" + replica, false, entry.quorum))
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
catch (Exception & e)
|
catch (Exception & e)
|
||||||
@ -2618,10 +2617,6 @@ String StorageReplicatedMergeTree::findReplicaHavingCoveringPart(LogEntry & entr
|
|||||||
return {};
|
return {};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
entry.actual_new_part_name = entry.new_part_name;
|
|
||||||
}
|
|
||||||
|
|
||||||
return replica;
|
return replica;
|
||||||
}
|
}
|
||||||
@ -3436,21 +3431,30 @@ void StorageReplicatedMergeTree::alter(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.");
|
|
||||||
|
|
||||||
table_lock_holder.release();
|
table_lock_holder.release();
|
||||||
|
|
||||||
std::vector<String> unwaited;
|
std::vector<String> unwaited;
|
||||||
if (query_context.getSettingsRef().replication_alter_partitions_sync == 2)
|
if (query_context.getSettingsRef().replication_alter_partitions_sync == 2)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.");
|
||||||
unwaited = waitForAllReplicasToProcessLogEntry(*alter_entry, false);
|
unwaited = waitForAllReplicasToProcessLogEntry(*alter_entry, false);
|
||||||
|
}
|
||||||
else if (query_context.getSettingsRef().replication_alter_partitions_sync == 1)
|
else if (query_context.getSettingsRef().replication_alter_partitions_sync == 1)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.");
|
||||||
waitForReplicaToProcessLogEntry(replica_name, *alter_entry);
|
waitForReplicaToProcessLogEntry(replica_name, *alter_entry);
|
||||||
|
}
|
||||||
|
|
||||||
if (!unwaited.empty())
|
if (!unwaited.empty())
|
||||||
throw Exception("Some replicas doesn't finish metadata alter", ErrorCodes::UNFINISHED);
|
throw Exception("Some replicas doesn't finish metadata alter", ErrorCodes::UNFINISHED);
|
||||||
|
|
||||||
if (mutation_znode)
|
if (mutation_znode)
|
||||||
|
{
|
||||||
|
LOG_DEBUG(log, "Metadata changes applied. Will wait for data changes.");
|
||||||
waitMutation(*mutation_znode, query_context.getSettingsRef().replication_alter_partitions_sync);
|
waitMutation(*mutation_znode, query_context.getSettingsRef().replication_alter_partitions_sync);
|
||||||
|
LOG_DEBUG(log, "Data changes applied.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context)
|
void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context)
|
||||||
|
@ -10,11 +10,11 @@ for i in `seq $REPLICAS`; do
|
|||||||
done
|
done
|
||||||
|
|
||||||
for i in `seq $REPLICAS`; do
|
for i in `seq $REPLICAS`; do
|
||||||
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_mt_$i (key UInt64, value1 UInt64, value2 String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_alter_mt', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0"
|
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_mt_$i (key UInt64, value1 UInt64, value2 Int32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_alter_mt', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0,max_replicated_merges_in_queue=1000"
|
||||||
done
|
done
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10)"
|
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, number from numbers(10)"
|
||||||
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10, 40)"
|
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, number from numbers(10, 40)"
|
||||||
|
|
||||||
for i in `seq $REPLICAS`; do
|
for i in `seq $REPLICAS`; do
|
||||||
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i"
|
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i"
|
||||||
@ -26,17 +26,6 @@ done
|
|||||||
|
|
||||||
INITIAL_SUM=`$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_1"`
|
INITIAL_SUM=`$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_1"`
|
||||||
|
|
||||||
# This is just garbage thread with conflictings alter
|
|
||||||
# it additionally loads alters "queue".
|
|
||||||
function garbage_alter_thread()
|
|
||||||
{
|
|
||||||
while true; do
|
|
||||||
REPLICA=$(($RANDOM % 5 + 1))
|
|
||||||
$CLICKHOUSE_CLIENT -n --query "ALTER TABLE concurrent_alter_mt_$REPLICA ADD COLUMN h String DEFAULT '0'; ALTER TABLE concurrent_alter_mt_$REPLICA MODIFY COLUMN h UInt64; ALTER TABLE concurrent_alter_mt_$REPLICA DROP COLUMN h;";
|
|
||||||
done
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
# This alters mostly requires not only metadata change
|
# This alters mostly requires not only metadata change
|
||||||
# but also conversion of data. Also they are all compatible
|
# but also conversion of data. Also they are all compatible
|
||||||
# between each other, so can be executed concurrently.
|
# between each other, so can be executed concurrently.
|
||||||
@ -61,26 +50,42 @@ function insert_thread()
|
|||||||
while true; do
|
while true; do
|
||||||
REPLICA=$(($RANDOM % 5 + 1))
|
REPLICA=$(($RANDOM % 5 + 1))
|
||||||
VALUE=${VALUES[$RANDOM % ${#VALUES[@]} ]}
|
VALUE=${VALUES[$RANDOM % ${#VALUES[@]} ]}
|
||||||
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_$REPLICA VALUES($RANDOM, $VALUE, toString($VALUE))"
|
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_$REPLICA VALUES($RANDOM, $VALUE, $VALUE)"
|
||||||
|
sleep 0.$RANDOM
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
# Some select load, to be sure, that our selects work in concurrent execution with alters
|
||||||
|
function select_thread()
|
||||||
|
{
|
||||||
|
while true; do
|
||||||
|
REPLICA=$(($RANDOM % 5 + 1))
|
||||||
|
$CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) FROM concurrent_alter_mt_$REPLICA" 1>/dev/null
|
||||||
sleep 0.$RANDOM
|
sleep 0.$RANDOM
|
||||||
done
|
done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
echo "Starting alters"
|
echo "Starting alters"
|
||||||
export -f garbage_alter_thread;
|
|
||||||
export -f correct_alter_thread;
|
export -f correct_alter_thread;
|
||||||
export -f insert_thread;
|
export -f insert_thread;
|
||||||
|
export -f select_thread;
|
||||||
|
|
||||||
|
|
||||||
TIMEOUT=500
|
TIMEOUT=60
|
||||||
|
|
||||||
|
|
||||||
|
# Selects should run successfully
|
||||||
|
timeout $TIMEOUT bash -c select_thread &
|
||||||
|
timeout $TIMEOUT bash -c select_thread &
|
||||||
|
timeout $TIMEOUT bash -c select_thread &
|
||||||
|
|
||||||
|
|
||||||
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
|
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
|
||||||
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
|
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
|
||||||
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
|
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
|
||||||
|
|
||||||
|
|
||||||
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
|
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
|
||||||
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
|
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
|
||||||
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
|
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
|
||||||
@ -95,16 +100,14 @@ wait
|
|||||||
|
|
||||||
echo "Finishing alters"
|
echo "Finishing alters"
|
||||||
|
|
||||||
# This alter will finish all previous
|
# This alter will finish all previous, but replica 1 maybe still not up-to-date
|
||||||
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>/dev/null
|
while [[ $($CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>&1) ]]; do
|
||||||
while [ $? -ne 0 ]; do
|
sleep 1
|
||||||
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>/dev/null
|
|
||||||
done
|
done
|
||||||
|
|
||||||
|
|
||||||
for i in `seq $REPLICAS`; do
|
for i in `seq $REPLICAS`; do
|
||||||
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i"
|
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i"
|
||||||
$CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_mt_$i"
|
$CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_mt_$i"
|
||||||
$CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done=0" # all mutations have to be done
|
$CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done=0" # all mutations have to be done
|
||||||
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i"
|
#$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i"
|
||||||
done
|
done
|
||||||
|
Loading…
Reference in New Issue
Block a user