Merge branch 'master' into storage-read-query-plan

This commit is contained in:
Nikolai Kochetov 2020-10-07 14:27:21 +03:00
commit 7e02152b50
40 changed files with 596 additions and 99 deletions

2
.gitmodules vendored
View File

@ -158,7 +158,7 @@
url = https://github.com/openldap/openldap.git
[submodule "contrib/AMQP-CPP"]
path = contrib/AMQP-CPP
url = https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
url = https://github.com/ClickHouse-Extras/AMQP-CPP.git
[submodule "contrib/cassandra"]
path = contrib/cassandra
url = https://github.com/ClickHouse-Extras/cpp-driver.git

2
contrib/AMQP-CPP vendored

@ -1 +1 @@
Subproject commit 1c08399ab0ab9e4042ef8e2bbe9e208e5dcbc13b
Subproject commit d63e1f016582e9faaaf279aa24513087a07bc6e7

View File

@ -16,6 +16,7 @@ set (SRCS
${LIBRARY_DIR}/src/flags.cpp
${LIBRARY_DIR}/src/linux_tcp/openssl.cpp
${LIBRARY_DIR}/src/linux_tcp/tcpconnection.cpp
${LIBRARY_DIR}/src/inbuffer.cpp
${LIBRARY_DIR}/src/receivedframe.cpp
${LIBRARY_DIR}/src/table.cpp
${LIBRARY_DIR}/src/watchable.cpp

View File

@ -360,6 +360,89 @@ Extracts a fragment of a string using a regular expression. If haystack do
Extracts all the fragments of a string using a regular expression. If haystack doesnt match the pattern regex, an empty string is returned. Returns an array of strings consisting of all matches to the regex. In general, the behavior is the same as the extract function (it takes the first subpattern, or the entire expression if there isnt a subpattern).
## extractAllGroupsHorizontal {#extractallgroups-horizontal}
Matches all groups of the `haystack` string using the `pattern` regular expression. Returns an array of arrays, where the first array includes all fragments matching the first group, the second array - matching the second group, etc.
!!! note "Note"
`extractAllGroupsHorizontal` function is slower than [extractAllGroupsVertical](#extractallgroups-vertical).
**Syntax**
``` sql
extractAllGroupsHorizontal(haystack, pattern)
```
**Parameters**
- `haystack` — Input string. Type: [String](../../sql-reference/data-types/string.md).
- `pattern` — Regular expression with [re2 syntax](https://github.com/google/re2/wiki/Syntax). Must contain groups, each group enclosed in parentheses. If `pattern` contains no groups, an exception is thrown. Type: [String](../../sql-reference/data-types/string.md).
**Returned value**
- Type: [Array](../../sql-reference/data-types/array.md).
If `haystack` doesnt match the `pattern` regex, an array of empty arrays is returned.
**Example**
Query:
``` sql
SELECT extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
```
Result:
``` text
┌─extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
│ [['abc','def','ghi'],['111','222','333']] │
└──────────────────────────────────────────────────────────────────────────────────────────┘
```
**See also**
- [extractAllGroupsVertical](#extractallgroups-vertical)
## extractAllGroupsVertical {#extractallgroups-vertical}
Matches all groups of the `haystack` string using the `pattern` regular expression. Returns an array of arrays, where each array includes matching fragments from every group. Fragments are grouped in order of appearance in the `haystack`.
**Syntax**
``` sql
extractAllGroupsVertical(haystack, pattern)
```
**Parameters**
- `haystack` — Input string. Type: [String](../../sql-reference/data-types/string.md).
- `pattern` — Regular expression with [re2 syntax](https://github.com/google/re2/wiki/Syntax). Must contain groups, each group enclosed in parentheses. If `pattern` contains no groups, an exception is thrown. Type: [String](../../sql-reference/data-types/string.md).
**Returned value**
- Type: [Array](../../sql-reference/data-types/array.md).
If `haystack` doesnt match the `pattern` regex, an empty array is returned.
**Example**
Query:
``` sql
SELECT extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
```
Result:
``` text
┌─extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
│ [['abc','111'],['def','222'],['ghi','333']] │
└────────────────────────────────────────────────────────────────────────────────────────┘
```
**See also**
- [extractAllGroupsHorizontal](#extractallgroups-horizontal)
## like(haystack, pattern), haystack LIKE pattern operator {#function-like}
Checks whether a string matches a simple regular expression.

View File

@ -341,6 +341,89 @@ Result:
Извлечение всех фрагментов строки по регулярному выражению. Если haystack не соответствует регулярному выражению pattern, то возвращается пустая строка. Возвращается массив строк, состоящий из всех соответствий регулярному выражению. В остальном, поведение аналогично функции extract (по прежнему, вынимается первый subpattern, или всё выражение, если subpattern-а нет).
## extractAllGroupsHorizontal {#extractallgroups-horizontal}
Разбирает строку `haystack` на фрагменты, соответствующие группам регулярного выражения `pattern`. Возвращает массив массивов, где первый массив содержит все фрагменты, соответствующие первой группе регулярного выражения, второй массив - соответствующие второй группе, и т.д.
!!! note "Замечание"
Функция `extractAllGroupsHorizontal` работает медленнее, чем функция [extractAllGroupsVertical](#extractallgroups-vertical).
**Синтаксис**
``` sql
extractAllGroupsHorizontal(haystack, pattern)
```
**Параметры**
- `haystack` — строка для разбора. Тип: [String](../../sql-reference/data-types/string.md).
- `pattern` — регулярное выражение, построенное по синтаксическим правилам [re2](https://github.com/google/re2/wiki/Syntax). Выражение должно содержать группы, заключенные в круглые скобки. Если выражение не содержит групп, генерируется исключение. Тип: [String](../../sql-reference/data-types/string.md).
**Возвращаемое значение**
- Тип: [Array](../../sql-reference/data-types/array.md).
Если в строке `haystack` нет групп, соответствующих регулярному выражению `pattern`, возвращается массив пустых массивов.
**Пример**
Запрос:
``` sql
SELECT extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
```
Результат:
``` text
┌─extractAllGroupsHorizontal('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
│ [['abc','def','ghi'],['111','222','333']] │
└──────────────────────────────────────────────────────────────────────────────────────────┘
```
**См. также**
- функция [extractAllGroupsVertical](#extractallgroups-vertical)
## extractAllGroupsVertical {#extractallgroups-vertical}
Разбирает строку `haystack` на фрагменты, соответствующие группам регулярного выражения `pattern`. Возвращает массив массивов, где каждый массив содержит по одному фрагменту, соответствующему каждой группе регулярного выражения. Фрагменты группируются в массивы в соответствии с порядком появления в исходной строке.
**Синтаксис**
``` sql
extractAllGroupsVertical(haystack, pattern)
```
**Параметры**
- `haystack` — строка для разбора. Тип: [String](../../sql-reference/data-types/string.md).
- `pattern` — регулярное выражение, построенное по синтаксическим правилам [re2](https://github.com/google/re2/wiki/Syntax). Выражение должно содержать группы, заключенные в круглые скобки. Если выражение не содержит групп, генерируется исключение. Тип: [String](../../sql-reference/data-types/string.md).
**Возвращаемое значение**
- Тип: [Array](../../sql-reference/data-types/array.md).
Если в строке `haystack` нет групп, соответствующих регулярному выражению `pattern`, возвращается пустой массив.
**Пример**
Запрос:
``` sql
SELECT extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')
```
Результат:
``` text
┌─extractAllGroupsVertical('abc=111, def=222, ghi=333', '("[^"]+"|\\w+)=("[^"]+"|\\w+)')─┐
│ [['abc','111'],['def','222'],['ghi','333']] │
└────────────────────────────────────────────────────────────────────────────────────────┘
```
**См. также**
- функция [extractAllGroupsHorizontal](#extractallgroups-horizontal)
## like(haystack, pattern), оператор haystack LIKE pattern {#function-like}
Проверка строки на соответствие простому регулярному выражению.

View File

@ -194,6 +194,7 @@ DiskCacheWrapper::writeFile(const String & path, size_t buf_size, WriteMode mode
auto src_buffer = cache_disk->readFile(path, buf_size, estimated_size, aio_threshold, 0);
auto dst_buffer = DiskDecorator::writeFile(path, buf_size, mode, estimated_size, aio_threshold);
copyData(*src_buffer, *dst_buffer);
dst_buffer->finalize();
},
buf_size);
}

View File

@ -27,6 +27,7 @@ void copyFile(IDisk & from_disk, const String & from_path, IDisk & to_disk, cons
auto in = from_disk.readFile(from_path);
auto out = to_disk.writeFile(to_path);
copyData(*in, *out);
out->finalize();
}

View File

@ -1,3 +1,4 @@
#include <aws/core/client/DefaultRetryStrategy.h>
#include <IO/ReadHelpers.h>
#include <IO/S3Common.h>
#include <IO/WriteHelpers.h>
@ -123,6 +124,9 @@ void registerDiskS3(DiskFactory & factory)
if (proxy_config)
cfg.perRequestConfiguration = [proxy_config](const auto & request) { return proxy_config->getConfiguration(request); };
cfg.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(
config.getUInt(config_prefix + ".retry_attempts", 10));
auto client = S3::ClientFactory::instance().create(
cfg,
uri.is_virtual_hosted_style,

View File

@ -27,7 +27,9 @@ namespace
StoragePtr tryGetTable(const ASTPtr & database_and_table, const Context & context)
{
auto table_id = context.resolveStorageID(database_and_table);
auto table_id = context.tryResolveStorageID(database_and_table);
if (!table_id)
return {};
return DatabaseCatalog::instance().tryGetTable(table_id, context);
}

View File

@ -566,7 +566,7 @@ void PipelineExecutor::executeStepImpl(size_t thread_num, size_t num_threads, st
}
if (node->exception)
finish();
cancel();
if (finished)
break;

View File

@ -101,7 +101,7 @@ public:
void setQuota(const std::shared_ptr<const EnabledQuota> & quota);
/// Do not allow to change the table while the processors of pipe are alive.
void addTableLock(const TableLockHolder & lock) { holder.table_locks.push_back(lock); }
void addTableLock(TableLockHolder lock) { holder.table_locks.emplace_back(std::move(lock)); }
/// This methods are from QueryPipeline. Needed to make conversion from pipeline to pipe possible.
void addInterpreterContext(std::shared_ptr<Context> context) { holder.interpreter_context.emplace_back(std::move(context)); }
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }

View File

@ -106,7 +106,7 @@ public:
const Block & getHeader() const { return pipe.getHeader(); }
void addTableLock(const TableLockHolder & lock) { pipe.addTableLock(lock); }
void addTableLock(TableLockHolder lock) { pipe.addTableLock(std::move(lock)); }
void addInterpreterContext(std::shared_ptr<Context> context) { pipe.addInterpreterContext(std::move(context)); }
void addStorageHolder(StoragePtr storage) { pipe.addStorageHolder(std::move(storage)); }
void addQueryPlan(std::unique_ptr<QueryPlan> plan) { pipe.addQueryPlan(std::move(plan)); }

View File

@ -276,7 +276,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
assertString("ttl format version: 1\n", ttl_infos_buffer);
ttl_infos.read(ttl_infos_buffer);
reservation = data.reserveSpacePreferringTTLRules(sum_files_size, ttl_infos, std::time(nullptr), 0, true);
reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
}
else
reservation = data.reserveSpace(sum_files_size);

View File

@ -88,6 +88,7 @@ void IMergeTreeDataPart::MinMaxIndex::store(
out_hashing.next();
out_checksums.files[file_name].file_size = out_hashing.count();
out_checksums.files[file_name].file_hash = out_hashing.getHash();
out->finalize();
}
}

View File

@ -3037,28 +3037,31 @@ ReservationPtr MergeTreeData::tryReserveSpace(UInt64 expected_size, SpacePtr spa
return space->reserve(expected_size);
}
ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index,
bool is_insert) const
ReservationPtr MergeTreeData::reserveSpacePreferringTTLRules(
const StorageMetadataPtr & metadata_snapshot,
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index,
bool is_insert) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
ReservationPtr reservation = tryReserveSpacePreferringTTLRules(expected_size, ttl_infos, time_of_move, min_volume_index, is_insert);
ReservationPtr reservation = tryReserveSpacePreferringTTLRules(metadata_snapshot, expected_size, ttl_infos, time_of_move, min_volume_index, is_insert);
return checkAndReturnReservation(expected_size, std::move(reservation));
}
ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index,
bool is_insert) const
ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
const StorageMetadataPtr & metadata_snapshot,
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
size_t min_volume_index,
bool is_insert) const
{
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
auto metadata_snapshot = getInMemoryMetadataPtr();
ReservationPtr reservation;
auto move_ttl_entry = selectTTLDescriptionForTTLInfos(metadata_snapshot->getMoveTTLs(), ttl_infos.moves_ttl, time_of_move, true);

View File

@ -632,6 +632,7 @@ public:
/// Reserves space at least 1MB preferring best destination according to `ttl_infos`.
ReservationPtr reserveSpacePreferringTTLRules(
const StorageMetadataPtr & metadata_snapshot,
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,
@ -639,6 +640,7 @@ public:
bool is_insert = false) const;
ReservationPtr tryReserveSpacePreferringTTLRules(
const StorageMetadataPtr & metadata_snapshot,
UInt64 expected_size,
const IMergeTreeDataPart::TTLInfos & ttl_infos,
time_t time_of_move,

View File

@ -182,6 +182,10 @@ std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescripti
for (auto ttl_entry_it = descriptions.begin(); ttl_entry_it != descriptions.end(); ++ttl_entry_it)
{
auto ttl_info_it = ttl_info_map.find(ttl_entry_it->result_column);
if (ttl_info_it == ttl_info_map.end())
continue;
time_t ttl_time;
if (use_max)
@ -190,8 +194,7 @@ std::optional<TTLDescription> selectTTLDescriptionForTTLInfos(const TTLDescripti
ttl_time = ttl_info_it->second.min;
/// Prefer TTL rule which went into action last.
if (ttl_info_it != ttl_info_map.end()
&& ttl_time <= current_time
if (ttl_time <= current_time
&& best_ttl_time <= ttl_time)
{
best_entry_it = ttl_entry_it;

View File

@ -229,6 +229,8 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
marks.next();
addToChecksums(checksums);
plain_file->finalize();
marks_file->finalize();
if (sync)
{
plain_file->sync();

View File

@ -17,8 +17,12 @@ namespace
void MergeTreeDataPartWriterOnDisk::Stream::finalize()
{
compressed.next();
plain_file->next();
/// 'compressed_buf' doesn't call next() on underlying buffer ('plain_hashing'). We should do it manually.
plain_hashing.next();
marks.next();
plain_file->finalize();
marks_file->finalize();
}
void MergeTreeDataPartWriterOnDisk::Stream::sync() const
@ -331,6 +335,7 @@ void MergeTreeDataPartWriterOnDisk::finishPrimaryIndexSerialization(
index_stream->next();
checksums.files["primary.idx"].file_size = index_stream->count();
checksums.files["primary.idx"].file_hash = index_stream->getHash();
index_file_stream->finalize();
if (sync)
index_file_stream->sync();
index_stream = nullptr;

View File

@ -237,7 +237,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataWriter::writeTempPart(BlockWithPa
updateTTL(ttl_entry, move_ttl_infos, move_ttl_infos.moves_ttl[ttl_entry.result_column], block, false);
NamesAndTypesList columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(expected_size, move_ttl_infos, time(nullptr), 0, true);
ReservationPtr reservation = data.reserveSpacePreferringTTLRules(metadata_snapshot, expected_size, move_ttl_infos, time(nullptr), 0, true);
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
auto new_data_part = data.createPart(

View File

@ -156,6 +156,7 @@ void MergeTreePartition::store(const Block & partition_key_sample, const DiskPtr
out_hashing.next();
checksums.files["partition.dat"].file_size = out_hashing.count();
checksums.files["partition.dat"].file_hash = out_hashing.getHash();
out->finalize();
}
void MergeTreePartition::create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row)

View File

@ -148,6 +148,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
count_out_hashing.next();
checksums.files["count.txt"].file_size = count_out_hashing.count();
checksums.files["count.txt"].file_hash = count_out_hashing.getHash();
count_out->finalize();
if (sync)
count_out->sync();
}
@ -160,6 +161,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
new_part->ttl_infos.write(out_hashing);
checksums.files["ttl.txt"].file_size = out_hashing.count();
checksums.files["ttl.txt"].file_hash = out_hashing.getHash();
out->finalize();
if (sync)
out->sync();
}
@ -170,6 +172,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
/// Write a file with a description of columns.
auto out = volume->getDisk()->writeFile(part_path + "columns.txt", 4096);
part_columns.writeText(*out);
out->finalize();
if (sync)
out->sync();
}
@ -178,6 +181,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
{
auto out = volume->getDisk()->writeFile(part_path + IMergeTreeDataPart::DEFAULT_COMPRESSION_CODEC_FILE_NAME, 4096);
DB::writeText(queryToString(default_codec->getFullCodecDesc()), *out);
out->finalize();
}
else
{
@ -189,6 +193,7 @@ void MergedBlockOutputStream::finalizePartOnDisk(
/// Write file with checksums.
auto out = volume->getDisk()->writeFile(part_path + "checksums.txt", 4096);
checksums.write(*out);
out->finalize();
if (sync)
out->sync();
}

View File

@ -21,81 +21,64 @@ namespace ErrorCodes
class MemorySource : public SourceWithProgress
{
using InitializerFunc = std::function<void(BlocksList::const_iterator &, size_t &)>;
public:
/// We use range [first, last] which includes right border.
/// Blocks are stored in std::list which may be appended in another thread.
/// We don't use synchronisation here, because elements in range [first, last] won't be modified.
/// We use pointer to the beginning of the list and its current size.
/// We don't need synchronisation in this reader, because while we hold SharedLock on storage,
/// only new elements can be added to the back of the list, so our iterators remain valid
MemorySource(
Names column_names_,
BlocksList::iterator first_,
BlocksList::const_iterator first_,
size_t num_blocks_,
const StorageMemory & storage,
const StorageMetadataPtr & metadata_snapshot)
const StorageMetadataPtr & metadata_snapshot,
InitializerFunc initializer_func_ = [](BlocksList::const_iterator &, size_t &) {})
: SourceWithProgress(metadata_snapshot->getSampleBlockForColumns(column_names_, storage.getVirtuals(), storage.getStorageID()))
, column_names(std::move(column_names_))
, current_it(first_)
, num_blocks(num_blocks_)
, initializer_func(std::move(initializer_func_))
{
}
/// If called, will initialize the number of blocks at first read.
/// It allows to read data which was inserted into memory table AFTER Storage::read was called.
/// This hack is needed for global subqueries.
void delayInitialization(BlocksList * data_, std::mutex * mutex_)
{
data = data_;
mutex = mutex_;
}
String getName() const override { return "Memory"; }
protected:
Chunk generate() override
{
if (data)
if (!postponed_init_done)
{
std::lock_guard guard(*mutex);
current_it = data->begin();
num_blocks = data->size();
is_finished = num_blocks == 0;
data = nullptr;
mutex = nullptr;
initializer_func(current_it, num_blocks);
postponed_init_done = true;
}
if (is_finished)
{
if (current_block_idx == num_blocks)
return {};
}
else
{
const Block & src = *current_it;
Columns columns;
columns.reserve(column_names.size());
/// Add only required columns to `res`.
for (const auto & name : column_names)
columns.emplace_back(src.getByName(name).column);
const Block & src = *current_it;
Columns columns;
columns.reserve(column_names.size());
++current_block_idx;
/// Add only required columns to `res`.
for (const auto & name : column_names)
columns.push_back(src.getByName(name).column);
if (current_block_idx == num_blocks)
is_finished = true;
else
++current_it;
if (++current_block_idx < num_blocks)
++current_it;
return Chunk(std::move(columns), src.rows());
}
return Chunk(std::move(columns), src.rows());
}
private:
Names column_names;
BlocksList::iterator current_it;
size_t current_block_idx = 0;
size_t num_blocks;
bool is_finished = false;
BlocksList * data = nullptr;
std::mutex * mutex = nullptr;
private:
const Names column_names;
BlocksList::const_iterator current_it;
size_t num_blocks;
size_t current_block_idx = 0;
bool postponed_init_done = false;
InitializerFunc initializer_func;
};
@ -113,9 +96,18 @@ public:
void write(const Block & block) override
{
const auto size_bytes_diff = block.allocatedBytes();
const auto size_rows_diff = block.rows();
metadata_snapshot->check(block, true);
std::lock_guard lock(storage.mutex);
storage.data.push_back(block);
{
std::lock_guard lock(storage.mutex);
storage.data.push_back(block);
storage.total_size_bytes.fetch_add(size_bytes_diff, std::memory_order_relaxed);
storage.total_size_rows.fetch_add(size_rows_diff, std::memory_order_relaxed);
}
}
private:
StorageMemory & storage;
@ -144,8 +136,6 @@ Pipe StorageMemory::read(
{
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
std::lock_guard lock(mutex);
if (delay_read_for_global_subqueries)
{
/// Note: for global subquery we use single source.
@ -156,11 +146,22 @@ Pipe StorageMemory::read(
/// set for IN or hash table for JOIN, which can't be done concurrently.
/// Since no other manipulation with data is done, multiple sources shouldn't give any profit.
auto source = std::make_shared<MemorySource>(column_names, data.begin(), data.size(), *this, metadata_snapshot);
source->delayInitialization(&data, &mutex);
return Pipe(std::move(source));
return Pipe(
std::make_shared<MemorySource>(
column_names, data.end(), 0, *this, metadata_snapshot,
/// This hack is needed for global subqueries.
/// It allows to set up this Source for read AFTER Storage::read() has been called and just before actual reading
[this](BlocksList::const_iterator & current_it, size_t & num_blocks)
{
std::lock_guard guard(mutex);
current_it = data.begin();
num_blocks = data.size();
}
));
}
std::lock_guard lock(mutex);
size_t size = data.size();
if (num_streams > size)
@ -168,7 +169,7 @@ Pipe StorageMemory::read(
Pipes pipes;
BlocksList::iterator it = data.begin();
BlocksList::const_iterator it = data.begin();
size_t offset = 0;
for (size_t stream = 0; stream < num_streams; ++stream)
@ -201,31 +202,32 @@ void StorageMemory::drop()
{
std::lock_guard lock(mutex);
data.clear();
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}
void StorageMemory::truncate(
const ASTPtr &, const StorageMetadataPtr &, const Context &, TableExclusiveLockHolder &)
{
std::lock_guard lock(mutex);
data.clear();
total_size_bytes.store(0, std::memory_order_relaxed);
total_size_rows.store(0, std::memory_order_relaxed);
}
std::optional<UInt64> StorageMemory::totalRows() const
{
UInt64 rows = 0;
std::lock_guard lock(mutex);
for (const auto & buffer : data)
rows += buffer.rows();
return rows;
/// All modifications of these counters are done under mutex which automatically guarantees synchronization/consistency
/// When run concurrently we are fine with any value: "before" or "after"
return total_size_rows.load(std::memory_order_relaxed);
}
std::optional<UInt64> StorageMemory::totalBytes() const
{
UInt64 bytes = 0;
std::lock_guard lock(mutex);
for (const auto & buffer : data)
bytes += buffer.allocatedBytes();
return bytes;
return total_size_bytes.load(std::memory_order_relaxed);
}

View File

@ -1,5 +1,7 @@
#pragma once
#include <atomic>
#include <optional>
#include <mutex>
#include <ext/shared_ptr_helper.h>
@ -93,6 +95,9 @@ private:
bool delay_read_for_global_subqueries = false;
std::atomic<size_t> total_size_bytes = 0;
std::atomic<size_t> total_size_rows = 0;
protected:
StorageMemory(const StorageID & table_id_, ColumnsDescription columns_description_, ConstraintsDescription constraints_);
};

View File

@ -300,7 +300,12 @@ struct CurrentlyMergingPartsTagger
StorageMergeTree & storage;
public:
CurrentlyMergingPartsTagger(FutureMergedMutatedPart & future_part_, size_t total_size, StorageMergeTree & storage_, bool is_mutation)
CurrentlyMergingPartsTagger(
FutureMergedMutatedPart & future_part_,
size_t total_size,
StorageMergeTree & storage_,
const StorageMetadataPtr & metadata_snapshot,
bool is_mutation)
: future_part(future_part_), storage(storage_)
{
/// Assume mutex is already locked, because this method is called from mergeTask.
@ -318,7 +323,7 @@ public:
max_volume_index = std::max(max_volume_index, storage.getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk()));
}
reserved_space = storage.tryReserveSpacePreferringTTLRules(total_size, ttl_infos, time(nullptr), max_volume_index);
reserved_space = storage.tryReserveSpacePreferringTTLRules(metadata_snapshot, total_size, ttl_infos, time(nullptr), max_volume_index);
}
if (!reserved_space)
{
@ -729,7 +734,7 @@ bool StorageMergeTree::merge(
return false;
}
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, false);
merging_tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace(future_part.parts), *this, metadata_snapshot, false);
auto table_id = getStorageID();
merge_entry = global_context.getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
}
@ -870,7 +875,7 @@ bool StorageMergeTree::tryMutatePart()
future_part.name = part->getNewName(new_part_info);
future_part.type = part->getType();
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, true);
tagger.emplace(future_part, MergeTreeDataMergerMutator::estimateNeededDiskSpace({part}), *this, metadata_snapshot, true);
break;
}
}

View File

@ -1416,11 +1416,11 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
ttl_infos.update(part_ptr->ttl_infos);
max_volume_index = std::max(max_volume_index, getStoragePolicy()->getVolumeIndexByDisk(part_ptr->volume->getDisk()));
}
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
ttl_infos, time(nullptr), max_volume_index);
auto table_lock = lockForShare(RWLockImpl::NO_QUERY, storage_settings_ptr->lock_acquire_timeout_for_background_operations);
StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr();
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(
metadata_snapshot, estimated_space_for_merge, ttl_infos, time(nullptr), max_volume_index);
FutureMergedMutatedPart future_merged_part(parts, entry.new_part_type);
if (future_merged_part.name != entry.new_part_name)

View File

@ -102,7 +102,7 @@
"with_coverage": false
},
{
"compiler": "clang-10",
"compiler": "clang-11",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
@ -134,7 +134,7 @@
"with_coverage": false
},
{
"compiler": "clang-10-darwin",
"compiler": "clang-11-darwin",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
@ -144,7 +144,7 @@
"with_coverage": false
},
{
"compiler": "clang-10-aarch64",
"compiler": "clang-11-aarch64",
"build-type": "",
"sanitizer": "",
"package-type": "binary",
@ -154,7 +154,7 @@
"with_coverage": false
},
{
"compiler": "clang-10-freebsd",
"compiler": "clang-11-freebsd",
"build-type": "",
"sanitizer": "",
"package-type": "binary",

View File

@ -0,0 +1,12 @@
<yandex>
<shutdown_wait_unfinished>3</shutdown_wait_unfinished>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,26 @@
<yandex>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<!-- Use custom S3 endpoint -->
<endpoint>http://resolver:8080/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>0</retry_attempts>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<profiles>
<default/>
</profiles>
</yandex>

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<yandex>
<tcp_port>9000</tcp_port>
<listen_host>127.0.0.1</listen_host>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<path>./clickhouse/</path>
<users_config>users.xml</users_config>
</yandex>

View File

@ -0,0 +1,49 @@
from bottle import request, route, run, response
# Endpoint can be configured to throw 500 error on N-th request attempt.
# In usual situation just redirects to original Minio server.
# Dict to the number of request should be failed.
cache = {}
@route('/fail_request/<_request_number>')
def fail_request(_request_number):
request_number = int(_request_number)
if request_number > 0:
cache['request_number'] = request_number
else:
cache.pop('request_number', None)
return 'OK'
# Handle for MultipleObjectsDelete.
@route('/<_bucket>', ['POST'])
def delete(_bucket):
response.set_header("Location", "http://minio1:9001/" + _bucket + "?" + request.query_string)
response.status = 307
return 'Redirected'
@route('/<_bucket>/<_path:path>', ['GET', 'POST', 'PUT', 'DELETE'])
def server(_bucket, _path):
if cache.get('request_number', None):
request_number = cache.pop('request_number') - 1
if request_number > 0:
cache['request_number'] = request_number
else:
response.status = 500
return 'Expected Error'
response.set_header("Location", "http://minio1:9001/" + _bucket + '/' + _path)
response.status = 307
return 'Redirected'
@route('/')
def ping():
return 'OK'
run(host='0.0.0.0', port=8080)

View File

@ -0,0 +1,117 @@
import logging
import os
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
logging.getLogger().setLevel(logging.INFO)
logging.getLogger().addHandler(logging.StreamHandler())
# Runs custom python-based S3 endpoint.
def run_endpoint(cluster):
logging.info("Starting custom S3 endpoint")
container_id = cluster.get_container_id('resolver')
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(container_id, os.path.join(current_dir, "s3_endpoint", "endpoint.py"), "endpoint.py")
cluster.exec_in_container(container_id, ["python", "endpoint.py"], detach=True)
# Wait for S3 endpoint start
for attempt in range(10):
ping_response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/"], nothrow=True)
if ping_response != 'OK':
if attempt == 9:
assert ping_response == 'OK', 'Expected "OK", but got "{}"'.format(ping_response)
else:
time.sleep(1)
else:
break
logging.info("S3 endpoint started")
def fail_request(cluster, request):
response = cluster.exec_in_container(cluster.get_container_id('resolver'),
["curl", "-s", "http://resolver:8080/fail_request/{}".format(request)])
assert response == 'OK', 'Expected "OK", but got "{}"'.format(response)
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance("node",
main_configs=["configs/config.d/log_conf.xml", "configs/config.d/storage_conf.xml"],
with_minio=True)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
run_endpoint(cluster)
yield cluster
finally:
cluster.shutdown()
@pytest.fixture(autouse=True)
def drop_table(cluster):
yield
node = cluster.instances["node"]
node.query("DROP TABLE IF EXISTS s3_failover_test NO DELAY")
# S3 request will be failed for an appropriate part file write.
FILES_PER_PART_BASE = 5 # partition.dat, default_compression_codec.txt, count.txt, columns.txt, checksums.txt
FILES_PER_PART_WIDE = FILES_PER_PART_BASE + 1 + 1 + 3 * 2 # Primary index, MinMax, Mark and data file for column(s)
FILES_PER_PART_COMPACT = FILES_PER_PART_BASE + 1 + 1 + 2
@pytest.mark.parametrize(
"min_bytes_for_wide_part,request_count",
[
(0, FILES_PER_PART_WIDE),
(1024 * 1024, FILES_PER_PART_COMPACT)
]
)
def test_write_failover(cluster, min_bytes_for_wide_part, request_count):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_failover_test (
dt Date,
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
PARTITION BY dt
SETTINGS storage_policy='s3', min_bytes_for_wide_part={}
"""
.format(min_bytes_for_wide_part)
)
for request in range(request_count + 1):
# Fail N-th request to S3.
fail_request(cluster, request + 1)
data = "('2020-03-01',0,'data'),('2020-03-01',1,'data')"
positive = request == request_count
try:
node.query("INSERT INTO s3_failover_test VALUES {}".format(data))
assert positive, "Insert query should be failed, request {}".format(request)
except QueryRuntimeException as e:
assert not positive, "Insert query shouldn't be failed, request {}".format(request)
assert str(e).find("Expected Error") != -1, "Unexpected error {}".format(str(e))
if positive:
# Disable request failing.
fail_request(cluster, 0)
assert node.query("CHECK TABLE s3_failover_test") == '1\n'
assert node.query("SELECT * FROM s3_failover_test FORMAT Values") == data

View File

@ -0,0 +1,36 @@
CREATE DATABASE IF NOT EXISTS shard_0;
CREATE DATABASE IF NOT EXISTS shard_1;
CREATE DATABASE IF NOT EXISTS main_01487;
CREATE DATABASE IF NOT EXISTS test_01487;
USE main_01487;
DROP TABLE IF EXISTS shard_0.l;
DROP TABLE IF EXISTS shard_1.l;
DROP TABLE IF EXISTS d;
DROP TABLE IF EXISTS t;
CREATE TABLE shard_0.l (value UInt8) ENGINE = MergeTree ORDER BY value;
CREATE TABLE shard_1.l (value UInt8) ENGINE = MergeTree ORDER BY value;
CREATE TABLE t (value UInt8) ENGINE = Memory;
INSERT INTO shard_0.l VALUES (0);
INSERT INTO shard_1.l VALUES (1);
INSERT INTO t VALUES (0), (1), (2);
CREATE TABLE d AS t ENGINE = Distributed(test_cluster_two_shards_different_databases, currentDatabase(), t);
USE test_01487;
DROP DATABASE test_01487;
SELECT * FROM main_01487.d WHERE value IN (SELECT l.value FROM l) ORDER BY value;
USE main_01487;
DROP TABLE IF EXISTS shard_0.l;
DROP TABLE IF EXISTS shard_1.l;
DROP TABLE IF EXISTS d;
DROP TABLE IF EXISTS t;
DROP DATABASE shard_0;
DROP DATABASE shard_1;

View File

@ -0,0 +1,21 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
# _shard_num:
# 1 on 127.2
# 2 on 127.3
# max_block_size to fail faster
# max_memory_usage/_shard_num/repeat() will allow failure on the first shard earlier.
opts=(
--max_memory_usage=3G
--max_block_size=50
--max_threads=1
--max_distributed_connections=2
)
${CLICKHOUSE_CLIENT} "${opts[@]}" -q "SELECT groupArray(repeat('a', 1000*_shard_num)), number%100000 k from remote('127.{2,3}', system.numbers) GROUP BY k LIMIT 10e6" |& {
# the query should fail earlier on 127.3 and 127.2 should not even go to the memory limit exceeded error.
fgrep -q 'DB::Exception: Received from 127.3:9000. DB::Exception: Memory limit (for query) exceeded:'
# while if this will not correctly then it will got the exception from the 127.2:9000 and fail
}