Merge branch 'master' into master

This commit is contained in:
Han Fei 2023-05-03 15:17:10 +02:00 committed by GitHub
commit 19048ed3fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 545 additions and 196 deletions

3
.gitmodules vendored
View File

@ -253,9 +253,6 @@
[submodule "contrib/qpl"]
path = contrib/qpl
url = https://github.com/intel/qpl
[submodule "contrib/idxd-config"]
path = contrib/idxd-config
url = https://github.com/intel/idxd-config
[submodule "contrib/wyhash"]
path = contrib/wyhash
url = https://github.com/wangyi-fudan/wyhash

1
contrib/idxd-config vendored

@ -1 +0,0 @@
Subproject commit f6605c41a735e3fdfef2d2d18655a33af6490b99

View File

@ -439,6 +439,50 @@ Syntax: `ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions,
- `number_of_hash_functions` — The number of hash functions used in the Bloom filter.
- `random_seed` — The seed for Bloom filter hash functions.
Users can create [UDF](/docs/en/sql-reference/statements/create/function.md) to estimate the parameters set of `ngrambf_v1`. Query statements are as follows:
```sql
CREATE FUNCTION bfEstimateFunctions [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, size_of_bloom_filter_in_bits) -> round((size_of_bloom_filter_in_bits / total_nubmer_of_all_grams) * log(2));
CREATE FUNCTION bfEstimateBmSize [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, probability_of_false_positives) -> ceil((total_nubmer_of_all_grams * log(probability_of_false_positives)) / log(1 / pow(2, log(2))));
CREATE FUNCTION bfEstimateFalsePositive [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, number_of_hash_functions, size_of_bloom_filter_in_bytes) -> pow(1 - exp(-number_of_hash_functions/ (size_of_bloom_filter_in_bytes / total_nubmer_of_all_grams)), number_of_hash_functions);
CREATE FUNCTION bfEstimateGramNumber [ON CLUSTER cluster]
AS
(number_of_hash_functions, probability_of_false_positives, size_of_bloom_filter_in_bytes) -> ceil(size_of_bloom_filter_in_bytes / (-number_of_hash_functions / log(1 - exp(log(probability_of_false_positives) / number_of_hash_functions))))
```
To use those functions,we need to specify two parameter at least.
For example, if there 4300 ngrams in the granule and we expect false positives to be less than 0.0001. The other parameters can be estimated by executing following queries:
```sql
--- estimate number of bits in the filter
SELECT bfEstimateBmSize(4300, 0.0001) / 8 as size_of_bloom_filter_in_bytes;
┌─size_of_bloom_filter_in_bytes─┐
│ 10304 │
└───────────────────────────────┘
--- estimate number of hash functions
SELECT bfEstimateFunctions(4300, bfEstimateBmSize(4300, 0.0001)) as number_of_hash_functions
┌─number_of_hash_functions─┐
│ 13 │
└──────────────────────────┘
```
Of course, you can also use those functions to estimate parameters by other conditions.
The functions refer to the content [here](https://hur.st/bloomfilter).
#### Token Bloom Filter
The same as `ngrambf_v1`, but stores tokens instead of ngrams. Tokens are sequences separated by non-alphanumeric characters.

View File

@ -554,7 +554,8 @@ public:
if (capacity < size_to_reserve)
{
if (unlikely(MAX_STRING_SIZE < size_to_reserve))
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({})", size_to_reserve);
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({}), maximum: {}",
size_to_reserve, MAX_STRING_SIZE);
size_t rounded_capacity = roundUpToPowerOfTwoOrZero(size_to_reserve);
chassert(rounded_capacity <= MAX_STRING_SIZE + 1); /// rounded_capacity <= 2^31
@ -624,7 +625,8 @@ public:
void changeImpl(StringRef value, Arena * arena)
{
if (unlikely(MAX_STRING_SIZE < value.size))
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({})", value.size);
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({}), maximum: {}",
value.size, MAX_STRING_SIZE);
UInt32 value_size = static_cast<UInt32>(value.size);

View File

@ -342,6 +342,31 @@ void ZooKeeper::createAncestors(const std::string & path)
}
}
void ZooKeeper::checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests)
{
std::vector<std::string> paths_to_check;
size_t pos = 1;
while (true)
{
pos = path.find('/', pos);
if (pos == std::string::npos)
break;
paths_to_check.emplace_back(path.substr(0, pos));
++pos;
}
MultiExistsResponse response = exists(paths_to_check);
for (size_t i = 0; i < paths_to_check.size(); ++i)
{
if (response[i].error != Coordination::Error::ZOK)
{
/// Ephemeral nodes cannot have children
requests.emplace_back(makeCreateRequest(paths_to_check[i], "", CreateMode::Persistent));
}
}
}
Coordination::Error ZooKeeper::removeImpl(const std::string & path, int32_t version)
{
auto future_result = asyncTryRemoveNoThrow(path, version);

View File

@ -237,6 +237,8 @@ public:
/// Does not create the node itself.
void createAncestors(const std::string & path);
void checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests);
/// Remove the node if the version matches. (if version == -1, remove any version).
void remove(const std::string & path, int32_t version = -1);
@ -522,8 +524,6 @@ public:
void setServerCompletelyStarted();
private:
friend class EphemeralNodeHolder;
void init(ZooKeeperArgs args_);
/// The following methods don't any throw exceptions but return error codes.

View File

@ -258,19 +258,22 @@ public:
Coordination::Error tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
{
path_created.clear();
auto error = access(
"tryCreate",
path,
[&]() { return keeper->tryCreate(path, data, mode, path_created); },
[&](Coordination::Error &)
[&](Coordination::Error & code)
{
try
{
if (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral)
if (!path_created.empty() && (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral))
{
keeper->remove(path);
keeper->remove(path_created);
if (unlikely(logger))
LOG_TRACE(logger, "ZooKeeperWithFaultInjection cleanup: seed={} func={} path={}", seed, "tryCreate", path);
LOG_TRACE(logger, "ZooKeeperWithFaultInjection cleanup: seed={} func={} path={} path_created={} code={}",
seed, "tryCreate", path, path_created, code);
}
}
catch (const zkutil::KeeperException & e)
@ -278,10 +281,11 @@ public:
if (unlikely(logger))
LOG_TRACE(
logger,
"ZooKeeperWithFaultInjection cleanup FAILED: seed={} func={} path={} code={} message={} ",
"ZooKeeperWithFaultInjection cleanup FAILED: seed={} func={} path={} path_created={} code={} message={} ",
seed,
"tryCreate",
path,
path_created,
e.code,
e.message());
}
@ -290,8 +294,8 @@ public:
/// collect ephemeral nodes when no fault was injected (to clean up later)
if (unlikely(fault_policy))
{
if (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral)
ephemeral_nodes.push_back(path);
if (!path_created.empty() && (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral))
ephemeral_nodes.push_back(path_created);
}
return error;
@ -357,6 +361,10 @@ public:
return access("trySet", path, [&]() { return keeper->trySet(path, data, version, stat); });
}
void checkExistsAndGetCreateAncestorsOps(const std::string & path, Coordination::Requests & requests)
{
return access("checkExistsAndGetCreateAncestorsOps", path, [&]() { return keeper->checkExistsAndGetCreateAncestorsOps(path, requests); });
}
void handleEphemeralNodeExistenceNoFailureInjection(const std::string & path, const std::string & fast_delete_if_equal_value)
{

View File

@ -473,7 +473,8 @@ Float NO_INLINE buffered(const PODArray<UInt8> & keys, const PODArray<Float> & v
return map[0].result();
}
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wframe-larger-than"
template <size_t UNROLL_COUNT>
Float NO_INLINE really_unrolled(const PODArray<UInt8> & keys, const PODArray<Float> & values)
{
@ -496,6 +497,7 @@ Float NO_INLINE really_unrolled(const PODArray<UInt8> & keys, const PODArray<Flo
return map[0].result();
}
#pragma clang diagnostic pop
struct State4

View File

@ -1048,7 +1048,7 @@ void DatabaseReplicated::dropReplica(
assert(!database || database_zookeeper_path == database->zookeeper_path);
if (full_replica_name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid replica name: {}", full_replica_name);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid replica name, '/' is not allowed: {}", full_replica_name);
auto zookeeper = Context::getGlobalContextInstance()->getZooKeeper();

View File

@ -761,6 +761,9 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::blockToAttributes(c
auto & attribute = attributes[attribute_index];
bool attribute_is_nullable = attribute.is_nullable_sets.has_value();
/// Number of elements should not take into account multiple attributes.
new_element_count = 0;
getAttributeContainers(attribute_index, [&](auto & containers)
{
using ContainerType = std::decay_t<decltype(containers.front())>;
@ -957,6 +960,15 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::calculateBytesAlloc
for (size_t attribute_index = 0; attribute_index < attributes_size; ++attribute_index)
{
/// bucket_count should be a sum over all shards (CollectionsHolder),
/// but it should not be a sum over all attributes, since it is used to
/// calculate load_factor like this:
///
/// element_count / bucket_count
///
/// While element_count is a sum over all shards, not over all attributes.
bucket_count = 0;
getAttributeContainers(attribute_index, [&](const auto & containers)
{
for (const auto & container : containers)
@ -973,12 +985,12 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::calculateBytesAlloc
/// and since this is sparsehash, empty cells should not be significant,
/// and since items cannot be removed from the dictionary, deleted is also not important.
bytes_allocated += container.size() * (sizeof(KeyType) + sizeof(AttributeValueType));
bucket_count = container.bucket_count();
bucket_count += container.bucket_count();
}
else
{
bytes_allocated += container.getBufferSizeInBytes();
bucket_count = container.getBufferSizeInCells();
bucket_count += container.getBufferSizeInCells();
}
}
});
@ -1002,12 +1014,12 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::calculateBytesAlloc
if constexpr (sparse)
{
bytes_allocated += container.size() * (sizeof(KeyType));
bucket_count = container.bucket_count();
bucket_count += container.bucket_count();
}
else
{
bytes_allocated += container.getBufferSizeInBytes();
bucket_count = container.getBufferSizeInCells();
bucket_count += container.getBufferSizeInCells();
}
}
}

View File

@ -150,7 +150,7 @@ namespace
if (text == "bc")
throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Era BC exceeds the range of DateTime");
else if (text != "ad")
throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Unknown era {}", text);
throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Unknown era {} (expected 'ad' or 'bc')", text);
}
void setCentury(Int32 century)

View File

@ -33,7 +33,7 @@ size_t HTTPChunkedReadBuffer::readChunkHeader()
} while (!in->eof() && isHexDigit(*in->position()));
if (res > max_chunk_size)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Chunk size exceeded the limit");
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Chunk size exceeded the limit (max size: {})", max_chunk_size);
/// NOTE: If we want to read any chunk extensions, it should be done here.

View File

@ -43,7 +43,7 @@ FileSegment::FileSegment(
, key_metadata(key_metadata_)
, queue_iterator(queue_iterator_)
, cache(cache_)
#ifndef NDEBUG
#ifdef ABORT_ON_LOGICAL_ERROR
, log(&Poco::Logger::get(fmt::format("FileSegment({}) : {}", key_.toString(), range().toString())))
#else
, log(&Poco::Logger::get("FileSegment"))
@ -56,6 +56,7 @@ FileSegment::FileSegment(
/// someone will _potentially_ want to download it (after calling getOrSetDownloader()).
case (State::EMPTY):
{
chassert(key_metadata.lock());
break;
}
/// DOWNLOADED is used either on initial cache metadata load into memory on server startup
@ -65,6 +66,7 @@ FileSegment::FileSegment(
reserved_size = downloaded_size = size_;
chassert(fs::file_size(getPathInLocalCache()) == size_);
chassert(queue_iterator);
chassert(key_metadata.lock());
break;
}
case (State::DETACHED):
@ -91,8 +93,16 @@ String FileSegment::getPathInLocalCache() const
return getKeyMetadata()->getFileSegmentPath(*this);
}
void FileSegment::setDownloadState(State state, const FileSegmentGuard::Lock &)
void FileSegment::setDownloadState(State state, const FileSegmentGuard::Lock & lock)
{
if (isCompleted(false) && state != State::DETACHED)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Updating state to {} of file segment is not allowed, because it is already completed ({})",
stateToString(state), getInfoForLogUnlocked(lock));
}
LOG_TEST(log, "Updated state from {} to {}", stateToString(download_state), stateToString(state));
download_state = state;
}
@ -182,12 +192,13 @@ String FileSegment::getOrSetDownloader()
if (current_downloader.empty())
{
const auto caller_id = getCallerId();
bool allow_new_downloader = download_state == State::EMPTY || download_state == State::PARTIALLY_DOWNLOADED || !caller_id.starts_with("None");
bool allow_new_downloader = download_state == State::EMPTY || download_state == State::PARTIALLY_DOWNLOADED;
if (!allow_new_downloader)
return "notAllowed:" + stateToString(download_state);
current_downloader = downloader_id = caller_id;
setDownloadState(State::DOWNLOADING, lock);
chassert(key_metadata.lock());
}
return current_downloader;

View File

@ -6,12 +6,14 @@
#include <Interpreters/getColumnFromBlock.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/StorageSnapshot.h>
#include <Storages/StorageMemory.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/ISource.h>
#include <Processors/Sources/NullSource.h>
namespace DB
{
@ -93,29 +95,39 @@ private:
InitializerFunc initializer_func;
};
ReadFromMemoryStorageStep::ReadFromMemoryStorageStep(Pipe pipe_) :
SourceStepWithFilter(DataStream{.header = pipe_.getHeader()}),
pipe(std::move(pipe_))
ReadFromMemoryStorageStep::ReadFromMemoryStorageStep(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
const size_t num_streams_,
const bool delay_read_for_global_sub_queries_) :
SourceStepWithFilter(DataStream{.header=storage_snapshot_->getSampleBlockForColumns(columns_to_read_)}),
columns_to_read(columns_to_read_),
storage_snapshot(storage_snapshot_),
num_streams(num_streams_),
delay_read_for_global_sub_queries(delay_read_for_global_sub_queries_)
{
}
void ReadFromMemoryStorageStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
// use move - make sure that the call will only be made once.
auto pipe = makePipe();
if (pipe.empty())
{
assert(output_stream != std::nullopt);
pipe = Pipe(std::make_shared<NullSource>(output_stream->header));
}
pipeline.init(std::move(pipe));
}
Pipe ReadFromMemoryStorageStep::makePipe(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
const bool delay_read_for_global_sub_queries_)
Pipe ReadFromMemoryStorageStep::makePipe()
{
storage_snapshot_->check(columns_to_read_);
storage_snapshot->check(columns_to_read);
const auto & snapshot_data = assert_cast<const StorageMemory::SnapshotData &>(*storage_snapshot_->data);
const auto & snapshot_data = assert_cast<const StorageMemory::SnapshotData &>(*storage_snapshot->data);
auto current_data = snapshot_data.blocks;
if (delay_read_for_global_sub_queries_)
if (delay_read_for_global_sub_queries)
{
/// Note: for global subquery we use single source.
/// Mainly, the reason is that at this point table is empty,
@ -126,8 +138,8 @@ Pipe ReadFromMemoryStorageStep::makePipe(const Names & columns_to_read_,
/// Since no other manipulation with data is done, multiple sources shouldn't give any profit.
return Pipe(std::make_shared<MemorySource>(
columns_to_read_,
storage_snapshot_,
columns_to_read,
storage_snapshot,
nullptr /* data */,
nullptr /* parallel execution index */,
[current_data](std::shared_ptr<const Blocks> & data_to_initialize)
@ -138,16 +150,16 @@ Pipe ReadFromMemoryStorageStep::makePipe(const Names & columns_to_read_,
size_t size = current_data->size();
if (num_streams_ > size)
num_streams_ = size;
if (num_streams > size)
num_streams = size;
Pipes pipes;
auto parallel_execution_index = std::make_shared<std::atomic<size_t>>(0);
for (size_t stream = 0; stream < num_streams_; ++stream)
for (size_t stream = 0; stream < num_streams; ++stream)
{
pipes.emplace_back(std::make_shared<MemorySource>(columns_to_read_, storage_snapshot_, current_data, parallel_execution_index));
pipes.emplace_back(std::make_shared<MemorySource>(columns_to_read, storage_snapshot, current_data, parallel_execution_index));
}
return Pipe::unitePipes(std::move(pipes));
}

View File

@ -5,6 +5,7 @@
#include <Interpreters/TreeRewriter.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
@ -14,7 +15,10 @@ class QueryPipelineBuilder;
class ReadFromMemoryStorageStep final : public SourceStepWithFilter
{
public:
explicit ReadFromMemoryStorageStep(Pipe pipe_);
ReadFromMemoryStorageStep(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
bool delay_read_for_global_sub_queries_);
ReadFromMemoryStorageStep() = delete;
ReadFromMemoryStorageStep(const ReadFromMemoryStorageStep &) = delete;
@ -27,14 +31,15 @@ public:
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
static Pipe makePipe(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
bool delay_read_for_global_sub_queries_);
private:
static constexpr auto name = "ReadFromMemoryStorage";
Pipe pipe;
Names columns_to_read;
StorageSnapshotPtr storage_snapshot;
size_t num_streams;
bool delay_read_for_global_sub_queries;
Pipe makePipe();
};
}

View File

@ -2012,7 +2012,7 @@ struct WindowFunctionNtile final : public WindowFunction
if (!buckets)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ntile's argument must > 0");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ntile's argument must be greater than 0");
}
}
// new partition

View File

@ -578,6 +578,9 @@ void DataPartStorageOnDiskBase::remove(
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST)
{
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
/// We will never touch this part again, so unlocking it from zero-copy
if (!can_remove_description)
can_remove_description.emplace(can_remove_callback());
return;
}
throw;
@ -588,6 +591,10 @@ void DataPartStorageOnDiskBase::remove(
{
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. "
"Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, from));
/// We will never touch this part again, so unlocking it from zero-copy
if (!can_remove_description)
can_remove_description.emplace(can_remove_callback());
return;
}
throw;

View File

@ -821,6 +821,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
const auto data_settings = data.getSettings();
MergeTreeData::DataPart::Checksums data_checksums;
zkutil::EphemeralNodeHolderPtr zero_copy_temporary_lock_holder;
if (to_remote_disk)
{
readStringBinary(part_id, in);
@ -829,7 +830,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {} (with type {}).", part_name, part_id, disk->getName(), toString(disk->getDataSourceDescription().type));
LOG_DEBUG(log, "Downloading part {} unique id {} metadata onto disk {}.", part_name, part_id, disk->getName());
data.lockSharedDataTemporary(part_name, part_id, disk);
zero_copy_temporary_lock_holder = data.lockSharedDataTemporary(part_name, part_id, disk);
}
else
{
@ -938,7 +939,6 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
if (to_remote_disk)
{
data.lockSharedData(*new_data_part, /* replace_existing_lock = */ true, {});
LOG_DEBUG(log, "Download of part {} unique id {} metadata onto disk {} finished.", part_name, part_id, disk->getName());
}
else
@ -948,6 +948,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDisk(
LOG_DEBUG(log, "Download of part {} onto disk {} finished.", part_name, disk->getName());
}
if (zero_copy_temporary_lock_holder)
zero_copy_temporary_lock_holder->setAlreadyRemoved();
return new_data_part;
}

View File

@ -144,7 +144,8 @@ StorageSnapshotPtr StorageMemory::getStorageSnapshot(const StorageMetadataPtr &
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, object_columns, std::move(snapshot_data));
}
Pipe StorageMemory::read(
void StorageMemory::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
@ -153,29 +154,7 @@ Pipe StorageMemory::read(
size_t /*max_block_size*/,
size_t num_streams)
{
return ReadFromMemoryStorageStep::makePipe(column_names, storage_snapshot, num_streams, delay_read_for_global_subqueries);
}
void StorageMemory::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
// @TODO it looks like IStorage::readFromPipe. different only step's type.
auto pipe = read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
if (pipe.empty())
{
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context);
return;
}
auto read_step = std::make_unique<ReadFromMemoryStorageStep>(std::move(pipe));
query_plan.addStep(std::move(read_step));
query_plan.addStep(std::make_unique<ReadFromMemoryStorageStep>(column_names, storage_snapshot, num_streams, delay_read_for_global_subqueries));
}

View File

@ -45,15 +45,6 @@ public:
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
void read(
QueryPlan & query_plan,
const Names & column_names,

View File

@ -1473,16 +1473,18 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
}
MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAndCommit(Transaction & transaction,
const MutableDataPartPtr & part, std::optional<MergeTreeData::HardlinkedFiles> hardlinked_files)
const MutableDataPartPtr & part, std::optional<MergeTreeData::HardlinkedFiles> hardlinked_files, bool replace_zero_copy_lock)
{
auto zookeeper = getZooKeeper();
while (true)
{
LOG_DEBUG(log, "Committing part {} to zookeeper", part->name);
Coordination::Requests ops;
NameSet absent_part_paths_on_replicas;
lockSharedData(*part, false, hardlinked_files);
getLockSharedDataOps(*part, std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), replace_zero_copy_lock, hardlinked_files, ops);
size_t zero_copy_lock_ops_size = ops.size();
/// Checksums are checked here and `ops` is filled. In fact, the part is added to ZK just below, when executing `multi`.
checkPartChecksumsAndAddCommitOps(zookeeper, part, ops, part->name, &absent_part_paths_on_replicas);
@ -1510,11 +1512,14 @@ MergeTreeData::DataPartsVector StorageReplicatedMergeTree::checkPartChecksumsAnd
Coordination::Responses responses;
Coordination::Error e = zookeeper->tryMulti(ops, responses);
if (e == Coordination::Error::ZOK)
{
LOG_DEBUG(log, "Part {} committed to zookeeper", part->name);
return transaction.commit();
}
if (e == Coordination::Error::ZNODEEXISTS)
{
size_t num_check_ops = 2 * absent_part_paths_on_replicas.size();
size_t num_check_ops = 2 * absent_part_paths_on_replicas.size() + zero_copy_lock_ops_size;
size_t failed_op_index = zkutil::getFailedOpIndex(e, responses);
if (failed_op_index < num_check_ops)
{
@ -4165,7 +4170,7 @@ bool StorageReplicatedMergeTree::fetchPart(
Transaction transaction(*this, NO_TRANSACTION_RAW);
renameTempPartAndReplace(part, transaction);
replaced_parts = checkPartChecksumsAndCommit(transaction, part, hardlinked_files);
replaced_parts = checkPartChecksumsAndCommit(transaction, part, hardlinked_files, !part_to_clone);
/** If a quorum is tracked for this part, you must update it.
* If you do not have time, in case of losing the session, when you restart the server - see the `ReplicatedMergeTreeRestartingThread::updateQuorumIfWeHavePart` method.
@ -8116,31 +8121,31 @@ std::optional<String> StorageReplicatedMergeTree::tryGetTableSharedIDFromCreateQ
}
void StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const
zkutil::EphemeralNodeHolderPtr StorageReplicatedMergeTree::lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const
{
auto settings = getSettings();
if (!disk || !disk->supportZeroCopyReplication() || !settings->allow_remote_fs_zero_copy_replication)
return;
return {};
zkutil::ZooKeeperPtr zookeeper = tryGetZooKeeper();
if (!zookeeper)
return;
return {};
String id = part_id;
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(),
part_name, zookeeper_path);
String zc_zookeeper_path = getZeroCopyPartPath(*getSettings(), toString(disk->getDataSourceDescription().type), getTableSharedID(),
part_name, zookeeper_path)[0];
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
LOG_TRACE(log, "Set zookeeper temporary ephemeral lock {}", zookeeper_node);
createZeroCopyLockNode(
std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), zookeeper_node, zkutil::CreateMode::Ephemeral, false);
}
LOG_TRACE(log, "Set zookeeper temporary ephemeral lock {}", zookeeper_node);
createZeroCopyLockNode(
std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), zookeeper_node, zkutil::CreateMode::Ephemeral, false);
LOG_TRACE(log, "Zookeeper temporary ephemeral lock {} created", zookeeper_node);
return zkutil::EphemeralNodeHolder::existing(zookeeper_node, *zookeeper);
}
void StorageReplicatedMergeTree::lockSharedData(
@ -8148,6 +8153,7 @@ void StorageReplicatedMergeTree::lockSharedData(
bool replace_existing_lock,
std::optional<HardlinkedFiles> hardlinked_files) const
{
LOG_DEBUG(log, "Trying to create zero-copy lock for part {}", part.name);
auto zookeeper = tryGetZooKeeper();
if (zookeeper)
return lockSharedData(part, std::make_shared<ZooKeeperWithFaultInjection>(zookeeper), replace_existing_lock, hardlinked_files);
@ -8155,6 +8161,54 @@ void StorageReplicatedMergeTree::lockSharedData(
return lockSharedData(part, std::make_shared<ZooKeeperWithFaultInjection>(nullptr), replace_existing_lock, hardlinked_files);
}
void StorageReplicatedMergeTree::getLockSharedDataOps(
const IMergeTreeDataPart & part,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
bool replace_existing_lock,
std::optional<HardlinkedFiles> hardlinked_files,
Coordination::Requests & requests) const
{
auto settings = getSettings();
if (!part.isStoredOnDisk() || !settings->allow_remote_fs_zero_copy_replication)
return;
if (!part.getDataPartStorage().supportZeroCopyReplication())
return;
if (zookeeper->isNull())
return;
String id = part.getUniqueId();
boost::replace_all(id, "/", "_");
Strings zc_zookeeper_paths = getZeroCopyPartPath(
*getSettings(), part.getDataPartStorage().getDiskType(), getTableSharedID(),
part.name, zookeeper_path);
String path_to_set_hardlinked_files;
NameSet hardlinks;
if (hardlinked_files.has_value() && !hardlinked_files->hardlinks_from_source_part.empty())
{
path_to_set_hardlinked_files = getZeroCopyPartPath(
*getSettings(), part.getDataPartStorage().getDiskType(), hardlinked_files->source_table_shared_id,
hardlinked_files->source_part_name, zookeeper_path)[0];
hardlinks = hardlinked_files->hardlinks_from_source_part;
}
for (const auto & zc_zookeeper_path : zc_zookeeper_paths)
{
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
getZeroCopyLockNodeCreateOps(
zookeeper, zookeeper_node, requests, zkutil::CreateMode::Persistent,
replace_existing_lock, path_to_set_hardlinked_files, hardlinks);
}
}
void StorageReplicatedMergeTree::lockSharedData(
const IMergeTreeDataPart & part,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
@ -8195,11 +8249,13 @@ void StorageReplicatedMergeTree::lockSharedData(
{
String zookeeper_node = fs::path(zc_zookeeper_path) / id / replica_name;
LOG_TRACE(log, "Set zookeeper persistent lock {}", zookeeper_node);
LOG_TRACE(log, "Trying to create zookeeper persistent lock {}", zookeeper_node);
createZeroCopyLockNode(
zookeeper, zookeeper_node, zkutil::CreateMode::Persistent,
replace_existing_lock, path_to_set_hardlinked_files, hardlinks);
LOG_TRACE(log, "Zookeeper persistent lock {} created", zookeeper_node);
}
}
@ -8333,6 +8389,7 @@ std::pair<bool, NameSet> getParentLockedBlobs(const ZooKeeperWithFaultInjectionP
/// all_0_0_0_1
/// all_0_0_0
std::sort(parts_infos.begin(), parts_infos.end());
std::string part_info_str = part_info.getPartNameV1();
/// In reverse order to process from bigger to smaller
for (const auto & [parent_candidate_info, part_candidate_info_str] : parts_infos | std::views::reverse)
@ -8343,7 +8400,7 @@ std::pair<bool, NameSet> getParentLockedBlobs(const ZooKeeperWithFaultInjectionP
/// We are mutation child of this parent
if (part_info.isMutationChildOf(parent_candidate_info))
{
LOG_TRACE(log, "Found mutation parent {} for part {}", part_candidate_info_str, part_info.getPartNameV1());
LOG_TRACE(log, "Found mutation parent {} for part {}", part_candidate_info_str, part_info_str);
/// Get hardlinked files
String files_not_to_remove_str;
Coordination::Error code;
@ -8360,6 +8417,7 @@ std::pair<bool, NameSet> getParentLockedBlobs(const ZooKeeperWithFaultInjectionP
return {true, files_not_to_remove};
}
}
LOG_TRACE(log, "No mutation parent found for part {}", part_info_str);
return {false, files_not_to_remove};
}
@ -8411,6 +8469,10 @@ std::pair<bool, NameSet> StorageReplicatedMergeTree::unlockSharedDataByID(
LOG_INFO(logger, "Lock on path {} for part {} doesn't exist, refuse to remove blobs", zookeeper_part_replica_node, part_name);
return {false, {}};
}
else
{
LOG_INFO(logger, "Lock on path {} for part {} doesn't exist, but we don't have mutation parent, can remove blobs", zookeeper_part_replica_node, part_name);
}
}
else
{
@ -8931,6 +8993,46 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
return true;
}
void StorageReplicatedMergeTree::getZeroCopyLockNodeCreateOps(
const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests,
int32_t mode, bool replace_existing_lock,
const String & path_to_set_hardlinked_files, const NameSet & hardlinked_files)
{
/// Ephemeral locks can be created only when we fetch shared data.
/// So it never require to create ancestors. If we create them
/// race condition with source replica drop is possible.
if (mode == zkutil::CreateMode::Persistent)
zookeeper->checkExistsAndGetCreateAncestorsOps(zookeeper_node, requests);
if (replace_existing_lock && zookeeper->exists(zookeeper_node))
{
requests.emplace_back(zkutil::makeRemoveRequest(zookeeper_node, -1));
requests.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty())
{
std::string data = boost::algorithm::join(hardlinked_files, "\n");
/// List of files used to detect hardlinks. path_to_set_hardlinked_files --
/// is a path to source part zero copy node. During part removal hardlinked
/// files will be left for source part.
requests.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1));
}
}
else
{
Coordination::Requests ops;
if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty())
{
std::string data = boost::algorithm::join(hardlinked_files, "\n");
/// List of files used to detect hardlinks. path_to_set_hardlinked_files --
/// is a path to source part zero copy node. During part removal hardlinked
/// files will be left for source part.
requests.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1));
}
requests.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
}
}
void StorageReplicatedMergeTree::createZeroCopyLockNode(
const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, int32_t mode,
@ -8942,75 +9044,49 @@ void StorageReplicatedMergeTree::createZeroCopyLockNode(
bool created = false;
for (int attempts = 5; attempts > 0; --attempts)
{
try
Coordination::Requests ops;
Coordination::Responses responses;
getZeroCopyLockNodeCreateOps(zookeeper, zookeeper_node, ops, mode, replace_existing_lock, path_to_set_hardlinked_files, hardlinked_files);
auto error = zookeeper->tryMulti(ops, responses);
if (error == Coordination::Error::ZOK)
{
/// Ephemeral locks can be created only when we fetch shared data.
/// So it never require to create ancestors. If we create them
/// race condition with source replica drop is possible.
if (mode == zkutil::CreateMode::Persistent)
zookeeper->createAncestors(zookeeper_node);
if (replace_existing_lock && zookeeper->exists(zookeeper_node))
{
Coordination::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(zookeeper_node, -1));
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty())
{
std::string data = boost::algorithm::join(hardlinked_files, "\n");
/// List of files used to detect hardlinks. path_to_set_hardlinked_files --
/// is a path to source part zero copy node. During part removal hardlinked
/// files will be left for source part.
ops.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1));
}
Coordination::Responses responses;
auto error = zookeeper->tryMulti(ops, responses);
if (error == Coordination::Error::ZOK)
{
created = true;
break;
}
else if (error == Coordination::Error::ZNONODE && mode != zkutil::CreateMode::Persistent)
{
throw Exception(ErrorCodes::NOT_FOUND_NODE,
"Cannot create ephemeral zero copy lock {} because part was unlocked from zookeeper", zookeeper_node);
}
}
else
{
Coordination::Requests ops;
if (!path_to_set_hardlinked_files.empty() && !hardlinked_files.empty())
{
std::string data = boost::algorithm::join(hardlinked_files, "\n");
/// List of files used to detect hardlinks. path_to_set_hardlinked_files --
/// is a path to source part zero copy node. During part removal hardlinked
/// files will be left for source part.
ops.emplace_back(zkutil::makeSetRequest(path_to_set_hardlinked_files, data, -1));
}
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_node, "", mode));
Coordination::Responses responses;
auto error = zookeeper->tryMulti(ops, responses);
if (error == Coordination::Error::ZOK || error == Coordination::Error::ZNODEEXISTS)
{
created = true;
break;
}
else if (error == Coordination::Error::ZNONODE && mode != zkutil::CreateMode::Persistent)
{
/// Ephemeral locks used during fetches so if parent node was removed we cannot do anything
throw Exception(ErrorCodes::NOT_FOUND_NODE,
"Cannot create ephemeral zero copy lock {} because part was unlocked from zookeeper", zookeeper_node);
}
}
created = true;
break;
}
catch (const zkutil::KeeperException & e)
else if (mode == zkutil::CreateMode::Persistent)
{
if (e.code == Coordination::Error::ZNONODE)
if (error == Coordination::Error::ZNONODE)
continue;
throw;
if (error == Coordination::Error::ZNODEEXISTS)
{
size_t failed_op = zkutil::getFailedOpIndex(error, responses);
/// Part was locked before, unfortunately it's possible during moves
if (ops[failed_op]->getPath() == zookeeper_node)
{
created = true;
break;
}
continue;
}
}
else if (mode == zkutil::CreateMode::Ephemeral)
{
/// It is super rare case when we had part, but it was lost and we were unable to unlock it from keeper.
/// Now we are trying to fetch it from other replica and unlocking.
if (error == Coordination::Error::ZNODEEXISTS)
{
size_t failed_op = zkutil::getFailedOpIndex(error, responses);
if (ops[failed_op]->getPath() == zookeeper_node)
{
LOG_WARNING(&Poco::Logger::get("ZeroCopyLocks"), "Replacing persistent lock with ephemeral for path {}. It can happen only in case of local part loss", zookeeper_node);
replace_existing_lock = true;
continue;
}
}
}
zkutil::KeeperMultiException::check(error, ops, responses);
}
if (!created)

View File

@ -252,7 +252,14 @@ public:
bool replace_existing_lock,
std::optional<HardlinkedFiles> hardlinked_files) const;
void lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;
void getLockSharedDataOps(
const IMergeTreeDataPart & part,
const ZooKeeperWithFaultInjectionPtr & zookeeper,
bool replace_existing_lock,
std::optional<HardlinkedFiles> hardlinked_files,
Coordination::Requests & requests) const;
zkutil::EphemeralNodeHolderPtr lockSharedDataTemporary(const String & part_name, const String & part_id, const DiskPtr & disk) const;
/// Unlock shared data part in zookeeper
/// Return true if data unlocked
@ -542,7 +549,7 @@ private:
String getChecksumsForZooKeeper(const MergeTreeDataPartChecksums & checksums) const;
/// Accepts a PreActive part, atomically checks its checksums with ones on other replicas and commit the part
DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional<HardlinkedFiles> hardlinked_files = {});
DataPartsVector checkPartChecksumsAndCommit(Transaction & transaction, const MutableDataPartPtr & part, std::optional<HardlinkedFiles> hardlinked_files = {}, bool replace_zero_copy_lock=false);
bool partIsAssignedToBackgroundOperation(const DataPartPtr & part) const override;
@ -861,6 +868,12 @@ private:
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});
static void getZeroCopyLockNodeCreateOps(
const ZooKeeperWithFaultInjectionPtr & zookeeper, const String & zookeeper_node, Coordination::Requests & requests,
int32_t mode = zkutil::CreateMode::Persistent, bool replace_existing_lock = false,
const String & path_to_set_hardlinked_files = "", const NameSet & hardlinked_files = {});
bool removeDetachedPart(DiskPtr disk, const String & path, const String & part_name) override;
/// Create freeze metadata for table and save in zookeeper. Required only if zero-copy replication enabled.

View File

@ -123,7 +123,6 @@
02713_array_low_cardinality_string
02707_skip_index_with_in
02707_complex_query_fails_analyzer
02680_mysql_ast_logical_err
02324_map_combinator_bug
02241_join_rocksdb_bs
02003_WithMergeableStateAfterAggregationAndLimit_LIMIT_BY_LIMIT_OFFSET
@ -136,4 +135,4 @@
00261_storage_aliases_and_array_join
02701_non_parametric_function
01825_type_json_multiple_files
01281_group_by_limit_memory_tracking
01281_group_by_limit_memory_tracking

View File

@ -10,8 +10,8 @@ from typing import Any, Callable, List
import requests # type: ignore
import get_robot_token as grt # we need an updated ROBOT_TOKEN
from ci_config import CI_CONFIG
from get_robot_token import ROBOT_TOKEN, get_best_robot_token
DOWNLOAD_RETRIES_COUNT = 5
@ -56,13 +56,18 @@ def get_gh_api(
def set_auth_header():
if "headers" in kwargs:
if "Authorization" not in kwargs["headers"]:
kwargs["headers"]["Authorization"] = f"Bearer {get_best_robot_token()}"
kwargs["headers"][
"Authorization"
] = f"Bearer {grt.get_best_robot_token()}"
else:
kwargs["headers"] = {"Authorization": f"Bearer {get_best_robot_token()}"}
kwargs["headers"] = {
"Authorization": f"Bearer {grt.get_best_robot_token()}"
}
if ROBOT_TOKEN is not None:
if grt.ROBOT_TOKEN is not None:
set_auth_header()
need_retry = False
for _ in range(retries):
try:
response = get_with_retries(url, 1, sleep, **kwargs)
@ -78,9 +83,11 @@ def get_gh_api(
"Received rate limit exception, setting the auth header and retry"
)
set_auth_header()
need_retry = True
break
return get_with_retries(url, retries, sleep, **kwargs)
if need_retry:
return get_with_retries(url, retries, sleep, **kwargs)
def get_build_name_for_check(check_name: str) -> str:

View File

@ -417,7 +417,7 @@ CHECK_DESCRIPTIONS = [
"AST fuzzer",
"Runs randomly generated queries to catch program errors. "
"The build type is optionally given in parenthesis. "
"If it fails, ask a maintainer for help.",
"If it fails, ask a maintainer for help",
lambda x: x.startswith("AST fuzzer"),
),
CheckDescription(
@ -439,13 +439,13 @@ CHECK_DESCRIPTIONS = [
"information to fix the error, but you might have to reproduce the failure "
"locally. The <b>cmake</b> options can be found in the build log, grepping for "
'<b>cmake</b>. Use these options and follow the <a href="'
'https://clickhouse.com/docs/en/development/build">general build process</a>.',
'https://clickhouse.com/docs/en/development/build">general build process</a>',
lambda x: x.startswith("ClickHouse") and x.endswith("build check"),
),
CheckDescription(
"Compatibility check",
"Checks that <b>clickhouse</b> binary runs on distributions with old libc "
"versions. If it fails, ask a maintainer for help.",
"versions. If it fails, ask a maintainer for help",
lambda x: x.startswith("Compatibility check"),
),
CheckDescription(
@ -465,12 +465,18 @@ CHECK_DESCRIPTIONS = [
"omitting some. If it fails, further checks are not started until it is fixed. "
"Look at the report to see which tests fail, then reproduce the failure "
'locally as described <a href="https://clickhouse.com/docs/en/development/'
'tests#functional-test-locally">here</a>.',
'tests#functional-test-locally">here</a>',
lambda x: x == "Fast test",
),
CheckDescription(
"Flaky tests",
"Checks if new added or modified tests are flaky by running them repeatedly, in parallel, with more randomization. Functional tests are run 100 times with address sanitizer, and additional randomization of thread scheduling. Integrational tests are run up to 10 times. If at least once a new test has failed, or was too long, this check will be red. We don't allow flaky tests, read https://clickhouse.com/blog/decorating-a-christmas-tree-with-the-help-of-flaky-tests/",
"Checks if new added or modified tests are flaky by running them repeatedly, "
"in parallel, with more randomization. Functional tests are run 100 times "
"with address sanitizer, and additional randomization of thread scheduling. "
"Integrational tests are run up to 10 times. If at least once a new test has "
"failed, or was too long, this check will be red. We don't allow flaky tests, "
'read <a href="https://clickhouse.com/blog/decorating-a-christmas-tree-with-'
'the-help-of-flaky-tests/">the doc</a>',
lambda x: "tests flaky check" in x,
),
CheckDescription(
@ -506,37 +512,37 @@ CHECK_DESCRIPTIONS = [
"Sqllogic",
"Run clickhouse on the "
'<a href="https://www.sqlite.org/sqllogictest">sqllogic</a> '
"test set against sqlite and checks that all statements are passed.",
"test set against sqlite and checks that all statements are passed",
lambda x: x.startswith("Sqllogic test"),
),
CheckDescription(
"SQLancer",
"Fuzzing tests that detect logical bugs with "
'<a href="https://github.com/sqlancer/sqlancer">SQLancer</a> tool.',
'<a href="https://github.com/sqlancer/sqlancer">SQLancer</a> tool',
lambda x: x.startswith("SQLancer"),
),
CheckDescription(
"Stateful tests",
"Runs stateful functional tests for ClickHouse binaries built in various "
"configurations -- release, debug, with sanitizers, etc.",
"configurations -- release, debug, with sanitizers, etc",
lambda x: x.startswith("Stateful tests ("),
),
CheckDescription(
"Stateless tests",
"Runs stateless functional tests for ClickHouse binaries built in various "
"configurations -- release, debug, with sanitizers, etc.",
"configurations -- release, debug, with sanitizers, etc",
lambda x: x.startswith("Stateless tests ("),
),
CheckDescription(
"Stress test",
"Runs stateless functional tests concurrently from several clients to detect "
"concurrency-related errors.",
"concurrency-related errors",
lambda x: x.startswith("Stress test ("),
),
CheckDescription(
"Style Check",
"Runs a set of checks to keep the code style clean. If some of tests failed, "
"see the related log from the report.",
"see the related log from the report",
lambda x: x == "Style Check",
),
CheckDescription(
@ -548,7 +554,7 @@ CHECK_DESCRIPTIONS = [
"Upgrade check",
"Runs stress tests on server version from last release and then tries to "
"upgrade it to the version from the PR. It checks if the new server can "
"successfully startup without any errors, crashes or sanitizer asserts.",
"successfully startup without any errors, crashes or sanitizer asserts",
lambda x: x.startswith("Upgrade check ("),
),
CheckDescription(

View File

@ -101,7 +101,21 @@ def post_commit_status(
raise ex
time.sleep(i)
if pr_info:
set_status_comment(commit, pr_info)
status_updated = False
for i in range(RETRY):
try:
set_status_comment(commit, pr_info)
status_updated = True
break
except Exception as ex:
logging.warning(
"Failed to update the status commit, will retry %s times: %s",
RETRY - i - 1,
ex,
)
if not status_updated:
logging.error("Failed to update the status comment, continue anyway")
def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
@ -116,6 +130,18 @@ def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
if not statuses:
return
if not [status for status in statuses if status.context == CI_STATUS_NAME]:
# This is the case, when some statuses already exist for the check,
# but not the CI_STATUS_NAME. We should create it as pending.
# W/o pr_info to avoid recursion, and yes, one extra create_ci_report
post_commit_status(
commit,
"pending",
create_ci_report(pr_info, statuses),
"The report for running CI",
CI_STATUS_NAME,
)
# We update the report in generate_status_comment function, so do it each
# run, even in the release PRs and normal pushes
comment_body = generate_status_comment(pr_info, statuses)

View File

@ -2117,7 +2117,14 @@ def reportLogStats(args):
'Column ''{}'' already exists', 'No macro {} in config', 'Invalid origin H3 index: {}',
'Invalid session timeout: ''{}''', 'Tuple cannot be empty', 'Database name is empty',
'Table {} is not a Dictionary', 'Expected function, got: {}', 'Unknown identifier: ''{}''',
'Failed to {} input ''{}''', '{}.{} is not a VIEW', 'Cannot convert NULL to {}', 'Dictionary {} doesn''t exist'
'Failed to {} input ''{}''', '{}.{} is not a VIEW', 'Cannot convert NULL to {}', 'Dictionary {} doesn''t exist',
'Write file: {}', 'Unable to parse JSONPath', 'Host is empty in S3 URI.', 'Expected end of line',
'inflate failed: {}{}', 'Center is not valid', 'Column ''{}'' is ambiguous', 'Cannot parse object', 'Invalid date: {}',
'There is no cache by name: {}', 'No part {} in table', '`{}` should be a String', 'There are duplicate id {}',
'Invalid replica name: {}', 'Unexpected value {} in enum', 'Unknown BSON type: {}', 'Point is not valid',
'Invalid qualified name: {}', 'INTO OUTFILE is not allowed', 'Arguments must not be NaN', 'Cell is not valid',
'brotli decode error{}', 'Invalid H3 index: {}', 'Too large node state size', 'No additional keys found.',
'Attempt to read after EOF.', 'Replication was stopped', '{} building file infos', 'Cannot parse uuid {}'
) AS known_short_messages
SELECT count() AS c, message_format_string, substr(any(message), 1, 120)
FROM system.text_log
@ -2256,7 +2263,7 @@ def main(args):
"\nFound hung queries in processlist:", args, "red", attrs=["bold"]
)
)
print(json.dumps(processlist, indent=4))
print(processlist)
print(get_transactions_list(args))
print_stacktraces()

View File

@ -2,7 +2,7 @@ runtime messages 0.001
runtime exceptions 0.05
messages shorter than 10 1
messages shorter than 16 3
exceptions shorter than 30 30
exceptions shorter than 30 3
noisy messages 0.3
noisy Trace messages 0.16
noisy Debug messages 0.09

View File

@ -49,7 +49,14 @@ create temporary table known_short_messages (s String) as select * from (select
'Column ''{}'' already exists', 'No macro {} in config', 'Invalid origin H3 index: {}',
'Invalid session timeout: ''{}''', 'Tuple cannot be empty', 'Database name is empty',
'Table {} is not a Dictionary', 'Expected function, got: {}', 'Unknown identifier: ''{}''',
'Failed to {} input ''{}''', '{}.{} is not a VIEW', 'Cannot convert NULL to {}', 'Dictionary {} doesn''t exist'
'Failed to {} input ''{}''', '{}.{} is not a VIEW', 'Cannot convert NULL to {}', 'Dictionary {} doesn''t exist',
'Write file: {}', 'Unable to parse JSONPath', 'Host is empty in S3 URI.', 'Expected end of line',
'inflate failed: {}{}', 'Center is not valid', 'Column ''{}'' is ambiguous', 'Cannot parse object', 'Invalid date: {}',
'There is no cache by name: {}', 'No part {} in table', '`{}` should be a String', 'There are duplicate id {}',
'Invalid replica name: {}', 'Unexpected value {} in enum', 'Unknown BSON type: {}', 'Point is not valid',
'Invalid qualified name: {}', 'INTO OUTFILE is not allowed', 'Arguments must not be NaN', 'Cell is not valid',
'brotli decode error{}', 'Invalid H3 index: {}', 'Too large node state size', 'No additional keys found.',
'Attempt to read after EOF.', 'Replication was stopped', '{} building file infos', 'Cannot parse uuid {}'
] as arr) array join arr;
-- Check that we don't have too many short meaningless message patterns.
@ -59,7 +66,7 @@ select 'messages shorter than 10', max2(countDistinctOrDefault(message_format_st
select 'messages shorter than 16', max2(countDistinctOrDefault(message_format_string), 3) from logs where length(message_format_string) < 16 and message_format_string not in known_short_messages;
-- Same as above, but exceptions must be more informative. Feel free to update the threshold or remove this query if really necessary
select 'exceptions shorter than 30', max2(countDistinctOrDefault(message_format_string), 30) from logs where length(message_format_string) < 30 and message ilike '%DB::Exception%' and message_format_string not in known_short_messages;
select 'exceptions shorter than 30', max2(countDistinctOrDefault(message_format_string), 3) from logs where length(message_format_string) < 30 and message ilike '%DB::Exception%' and message_format_string not in known_short_messages;
-- Avoid too noisy messages: top 1 message frequency must be less than 30%. We should reduce the threshold
@ -98,7 +105,9 @@ select 'incorrect patterns', max2(countDistinct(message_format_string), 15) from
where ((rand() % 8) = 0)
and message not like (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') as s)
and message not like (s || ' (skipped % similar messages)')
and message not like ('%Exception: '||s||'%') group by message_format_string
and message not like ('%Exception: '||s||'%')
and message not like ('%(skipped % similar messages)%')
group by message_format_string
) where any_message not like '%Poco::Exception%';
drop table logs;

View File

@ -1,2 +1,4 @@
CREATE TABLE foo (key UInt32, a String, b Int64, c String) ENGINE = TinyLog;
SELECT count() FROM mysql(mysql('127.0.0.1:9004', currentDatabase(), 'foo', 'default', ''), '127.0.0.1:9004', currentDatabase(), 'foo', '', ''); -- { serverError UNKNOWN_FUNCTION }
-- SELECT count() FROM mysql(mysql('127.0.0.1:9004', currentDatabase(), 'foo', 'default', '', SETTINGS connection_pool_size = 1), '127.0.0.1:9004', currentDatabase(), 'foo', '', ''); -- { serverError UNKNOWN_FUNCTION }
SELECT count() FROM mysql(mysql('127.0.0.1:9004', currentDatabase(), 'foo', 'default', '', SETTINGS connection_pool_size = 1), '127.0.0.1:9004', currentDatabase(), 'foo', '', ''); -- { serverError UNKNOWN_FUNCTION, UNSUPPORTED_METHOD }

View File

@ -0,0 +1,10 @@
DROP TABLE IF EXISTS keeper_fault_inject_sequential_cleanup;
CREATE TABLE keeper_fault_inject_sequential_cleanup (d Int8) ENGINE = ReplicatedMergeTree('/clickhouse/{database}/test_02725/tables/keeper_fault_inject_sequential_cleanup', '1') ORDER BY d;
INSERT INTO keeper_fault_inject_sequential_cleanup VALUES (1);
INSERT INTO keeper_fault_inject_sequential_cleanup SETTINGS insert_deduplicate = 0 VALUES (1);
INSERT INTO keeper_fault_inject_sequential_cleanup SETTINGS insert_deduplicate = 0, insert_keeper_fault_injection_probability = 0.4, insert_keeper_fault_injection_seed = 5619964844601345291 VALUES (1);
-- with database ordinary it produced a warning
DROP TABLE keeper_fault_inject_sequential_cleanup;

View File

@ -0,0 +1,78 @@
#!/usr/bin/env bash
# Tags: race, zookeeper, no-parallel, no-upgrade-check, no-replicated-database
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
NUM_REPLICAS=5
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT -n -q "
DROP TABLE IF EXISTS r$i SYNC;
CREATE TABLE r$i (x UInt64) ENGINE = ReplicatedMergeTree('/clickhouse/tables/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/r', 'r$i') ORDER BY x SETTINGS replicated_deduplication_window = 1, allow_remote_fs_zero_copy_replication = 1;
"
done
function thread {
while true; do
REPLICA=$(($RANDOM % 5 + 1))
$CLICKHOUSE_CLIENT --query "INSERT INTO r$REPLICA SELECT rand()"
done
}
function nemesis_thread1 {
while true; do
REPLICA=$(($RANDOM % 5 + 1))
$CLICKHOUSE_CLIENT --query "SYSTEM STOP REPLICATED SENDS r$REPLICA"
sleep 0.5
$CLICKHOUSE_CLIENT --query "SYSTEM START REPLICATED SENDS r$REPLICA"
done
}
function nemesis_thread2 {
while true; do
REPLICA=$(($RANDOM % 5 + 1))
$CLICKHOUSE_CLIENT --query "SYSTEM STOP FETCHES r$REPLICA"
sleep 0.5
$CLICKHOUSE_CLIENT --query "SYSTEM START FETCHES r$REPLICA"
done
}
export -f thread
export -f nemesis_thread1
export -f nemesis_thread2
TIMEOUT=20
timeout $TIMEOUT bash -c thread 2>/dev/null &
timeout $TIMEOUT bash -c thread 2>/dev/null &
timeout $TIMEOUT bash -c thread 2>/dev/null &
timeout $TIMEOUT bash -c nemesis_thread1 2>/dev/null &
timeout $TIMEOUT bash -c nemesis_thread1 2>/dev/null &
timeout $TIMEOUT bash -c nemesis_thread1 2>/dev/null &
timeout $TIMEOUT bash -c nemesis_thread2 2>/dev/null &
timeout $TIMEOUT bash -c nemesis_thread2 2>/dev/null &
timeout $TIMEOUT bash -c nemesis_thread2 2>/dev/null &
wait
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT -q "SYSTEM START FETCHES r$REPLICA"
$CLICKHOUSE_CLIENT -q "SYSTEM START REPLICATED SENDS r$REPLICA"
done
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT --max_execution_time 60 -q "SYSTEM SYNC REPLICA r$i PULL"
done
for i in $(seq 1 $NUM_REPLICAS); do
$CLICKHOUSE_CLIENT -q "DROP TABLE r$i" 2>/dev/null &
done
wait

View File

@ -0,0 +1,2 @@
dict_sharded 1 1000000 0.4768
dict_sharded_multi 5 1000000 0.4768

View File

@ -0,0 +1,17 @@
DROP DICTIONARY IF EXISTS dict_sharded;
DROP DICTIONARY IF EXISTS dict_sharded_multi;
DROP TABLE IF EXISTS dict_data;
CREATE TABLE dict_data (key UInt64, v0 UInt16, v1 UInt16, v2 UInt16, v3 UInt16, v4 UInt16) engine=Memory() AS SELECT number, number%65535, number%65535, number%6553, number%655355, number%65535 FROM numbers(1e6);
CREATE DICTIONARY dict_sharded (key UInt64, v0 UInt16) PRIMARY KEY key SOURCE(CLICKHOUSE(TABLE 'dict_data')) LIFETIME(MIN 0 MAX 0) LAYOUT(HASHED(SHARDS 32));
SYSTEM RELOAD DICTIONARY dict_sharded;
SELECT name, length(attribute.names), element_count, round(load_factor, 4) FROM system.dictionaries WHERE database = currentDatabase() AND name = 'dict_sharded';
DROP DICTIONARY dict_sharded;
CREATE DICTIONARY dict_sharded_multi (key UInt64, v0 UInt16, v1 UInt16, v2 UInt16, v3 UInt16, v4 UInt16) PRIMARY KEY key SOURCE(CLICKHOUSE(TABLE 'dict_data')) LIFETIME(MIN 0 MAX 0) LAYOUT(HASHED(SHARDS 32));
SYSTEM RELOAD DICTIONARY dict_sharded_multi;
SELECT name, length(attribute.names), element_count, round(load_factor, 4) FROM system.dictionaries WHERE database = currentDatabase() AND name = 'dict_sharded_multi';
DROP DICTIONARY dict_sharded_multi;
DROP TABLE dict_data;