mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
Revert "Fix 'Block structure mismatch' on concurrent ALTER and INSERTs in Buffer table (#55995)"
This reverts commit b65c498016
.
This commit is contained in:
parent
73c5312392
commit
9aff0a8872
@ -165,8 +165,7 @@ public:
|
|||||||
: ISource(storage_snapshot->getSampleBlockForColumns(column_names_))
|
: ISource(storage_snapshot->getSampleBlockForColumns(column_names_))
|
||||||
, column_names_and_types(storage_snapshot->getColumnsByNames(
|
, column_names_and_types(storage_snapshot->getColumnsByNames(
|
||||||
GetColumnsOptions(GetColumnsOptions::All).withSubcolumns(), column_names_))
|
GetColumnsOptions(GetColumnsOptions::All).withSubcolumns(), column_names_))
|
||||||
, buffer(buffer_)
|
, buffer(buffer_) {}
|
||||||
, metadata_version(storage_snapshot->metadata->metadata_version) {}
|
|
||||||
|
|
||||||
String getName() const override { return "Buffer"; }
|
String getName() const override { return "Buffer"; }
|
||||||
|
|
||||||
@ -181,7 +180,7 @@ protected:
|
|||||||
|
|
||||||
std::unique_lock lock(buffer.lockForReading());
|
std::unique_lock lock(buffer.lockForReading());
|
||||||
|
|
||||||
if (!buffer.data.rows() || buffer.metadata_version != metadata_version)
|
if (!buffer.data.rows())
|
||||||
return res;
|
return res;
|
||||||
|
|
||||||
Columns columns;
|
Columns columns;
|
||||||
@ -199,7 +198,6 @@ protected:
|
|||||||
private:
|
private:
|
||||||
NamesAndTypesList column_names_and_types;
|
NamesAndTypesList column_names_and_types;
|
||||||
StorageBuffer::Buffer & buffer;
|
StorageBuffer::Buffer & buffer;
|
||||||
int32_t metadata_version;
|
|
||||||
bool has_been_read = false;
|
bool has_been_read = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -617,7 +615,7 @@ public:
|
|||||||
least_busy_buffer = &storage.buffers[start_shard_num];
|
least_busy_buffer = &storage.buffers[start_shard_num];
|
||||||
least_busy_lock = least_busy_buffer->lockForWriting();
|
least_busy_lock = least_busy_buffer->lockForWriting();
|
||||||
}
|
}
|
||||||
insertIntoBuffer(block, *least_busy_buffer, metadata_snapshot->metadata_version);
|
insertIntoBuffer(block, *least_busy_buffer);
|
||||||
least_busy_lock.unlock();
|
least_busy_lock.unlock();
|
||||||
|
|
||||||
storage.reschedule();
|
storage.reschedule();
|
||||||
@ -626,15 +624,14 @@ private:
|
|||||||
StorageBuffer & storage;
|
StorageBuffer & storage;
|
||||||
StorageMetadataPtr metadata_snapshot;
|
StorageMetadataPtr metadata_snapshot;
|
||||||
|
|
||||||
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer, int32_t metadata_version)
|
void insertIntoBuffer(const Block & block, StorageBuffer::Buffer & buffer)
|
||||||
{
|
{
|
||||||
time_t current_time = time(nullptr);
|
time_t current_time = time(nullptr);
|
||||||
|
|
||||||
/// Sort the columns in the block. This is necessary to make it easier to concatenate the blocks later.
|
/// Sort the columns in the block. This is necessary to make it easier to concatenate the blocks later.
|
||||||
Block sorted_block = block.sortColumns();
|
Block sorted_block = block.sortColumns();
|
||||||
|
|
||||||
if (storage.checkThresholds(buffer, /* direct= */true, current_time, sorted_block.rows(), sorted_block.bytes()) ||
|
if (storage.checkThresholds(buffer, /* direct= */true, current_time, sorted_block.rows(), sorted_block.bytes()))
|
||||||
buffer.metadata_version != metadata_version)
|
|
||||||
{
|
{
|
||||||
/** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
|
/** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
|
||||||
* This also protects against unlimited consumption of RAM, since if it is impossible to write to the table,
|
* This also protects against unlimited consumption of RAM, since if it is impossible to write to the table,
|
||||||
@ -642,7 +639,6 @@ private:
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
storage.flushBuffer(buffer, false /* check_thresholds */, true /* locked */);
|
storage.flushBuffer(buffer, false /* check_thresholds */, true /* locked */);
|
||||||
buffer.metadata_version = metadata_version;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!buffer.first_write_time)
|
if (!buffer.first_write_time)
|
||||||
@ -1066,12 +1062,13 @@ void StorageBuffer::alter(const AlterCommands & params, ContextPtr local_context
|
|||||||
checkAlterIsPossible(params, local_context);
|
checkAlterIsPossible(params, local_context);
|
||||||
auto metadata_snapshot = getInMemoryMetadataPtr();
|
auto metadata_snapshot = getInMemoryMetadataPtr();
|
||||||
|
|
||||||
/// Flush buffers to the storage because BufferSource skips buffers with old metadata_version.
|
/// Flush all buffers to storages, so that no non-empty blocks of the old
|
||||||
|
/// structure remain. Structure of empty blocks will be updated during first
|
||||||
|
/// insert.
|
||||||
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, {}, false /*cleanup*/, local_context);
|
optimize({} /*query*/, metadata_snapshot, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, {}, false /*cleanup*/, local_context);
|
||||||
|
|
||||||
StorageInMemoryMetadata new_metadata = *metadata_snapshot;
|
StorageInMemoryMetadata new_metadata = *metadata_snapshot;
|
||||||
params.apply(new_metadata, local_context);
|
params.apply(new_metadata, local_context);
|
||||||
new_metadata.metadata_version += 1;
|
|
||||||
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
|
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
|
||||||
setInMemoryMetadata(new_metadata);
|
setInMemoryMetadata(new_metadata);
|
||||||
}
|
}
|
||||||
|
@ -128,18 +128,6 @@ private:
|
|||||||
time_t first_write_time = 0;
|
time_t first_write_time = 0;
|
||||||
Block data;
|
Block data;
|
||||||
|
|
||||||
/// Schema version, checked to avoid mixing blocks with different sets of columns, from
|
|
||||||
/// before and after an ALTER. There are some remaining mild problems if an ALTER happens
|
|
||||||
/// in the middle of a long-running INSERT:
|
|
||||||
/// * The data produced by the INSERT after the ALTER is not visible to SELECTs until flushed.
|
|
||||||
/// That's because BufferSource skips buffers with old metadata_version instead of converting
|
|
||||||
/// them to the latest schema, for simplicity.
|
|
||||||
/// * If there are concurrent INSERTs, some of which started before the ALTER and some started
|
|
||||||
/// after, then the buffer's metadata_version will oscillate back and forth between the two
|
|
||||||
/// schemas, flushing the buffer each time. This is probably fine because long-running INSERTs
|
|
||||||
/// usually don't produce lots of small blocks.
|
|
||||||
int32_t metadata_version = 0;
|
|
||||||
|
|
||||||
std::unique_lock<std::mutex> lockForReading() const;
|
std::unique_lock<std::mutex> lockForReading() const;
|
||||||
std::unique_lock<std::mutex> lockForWriting() const;
|
std::unique_lock<std::mutex> lockForWriting() const;
|
||||||
std::unique_lock<std::mutex> tryLock() const;
|
std::unique_lock<std::mutex> tryLock() const;
|
||||||
|
@ -1,10 +0,0 @@
|
|||||||
0
|
|
||||||
1
|
|
||||||
2 bobr
|
|
||||||
0
|
|
||||||
1
|
|
||||||
1
|
|
||||||
1
|
|
||||||
1
|
|
||||||
1
|
|
||||||
2 bobr
|
|
@ -1,34 +0,0 @@
|
|||||||
#!/usr/bin/env bash
|
|
||||||
|
|
||||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
|
||||||
# shellcheck source=../shell_config.sh
|
|
||||||
. "$CURDIR"/../shell_config.sh
|
|
||||||
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -q "drop table if exists 02900_buffer"
|
|
||||||
$CLICKHOUSE_CLIENT -q "drop table if exists 02900_destination"
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -q "create table 02900_destination (k Int8, v String) engine Memory"
|
|
||||||
$CLICKHOUSE_CLIENT -q "create table 02900_buffer (k Int8) engine Buffer(currentDatabase(), '02900_destination', 1, 1000, 1000, 10000, 10000, 1000000, 1000000)"
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -q "insert into 02900_buffer (k) select 0"
|
|
||||||
|
|
||||||
# Start a long-running INSERT that uses the old schema.
|
|
||||||
$CLICKHOUSE_CLIENT -q "insert into 02900_buffer (k) select sleepEachRow(1)+1 from numbers(5) settings max_block_size=1, max_insert_block_size=1, min_insert_block_size_rows=0, min_insert_block_size_bytes=0" &
|
|
||||||
|
|
||||||
sleep 1
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -q "alter table 02900_buffer add column v String"
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -q "insert into 02900_buffer (k, v) select 2, 'bobr'"
|
|
||||||
|
|
||||||
wait
|
|
||||||
|
|
||||||
# The data produced by the long-running INSERT after the ALTER is not visible until flushed.
|
|
||||||
$CLICKHOUSE_CLIENT -q "select k, any(v) from 02900_buffer group by k order by k"
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -q "optimize table 02900_buffer"
|
|
||||||
$CLICKHOUSE_CLIENT -q "select * from 02900_buffer order by k"
|
|
||||||
|
|
||||||
$CLICKHOUSE_CLIENT -q "drop table 02900_buffer"
|
|
||||||
$CLICKHOUSE_CLIENT -q "drop table 02900_destination"
|
|
Loading…
Reference in New Issue
Block a user