Merge branch 'master' into fix_zero_copy_not_atomic

This commit is contained in:
alesapin 2023-05-03 12:53:35 +02:00 committed by GitHub
commit 6f3f202f7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
121 changed files with 869 additions and 392 deletions

View File

@ -170,18 +170,6 @@ else ()
set(NO_WHOLE_ARCHIVE --no-whole-archive)
endif ()
option(ENABLE_CURL_BUILD "Enable curl, azure, sentry build on by default except MacOS." ON)
if (OS_DARWIN)
# Disable the curl, azure, senry build on MacOS
set (ENABLE_CURL_BUILD OFF)
endif ()
option(ENABLE_ISAL_LIBRARY "Enable ISA-L library ON by default except on aarch64." ON)
if (ARCH_AARCH64)
# Disable ISA-L libray on aarch64.
set (ENABLE_ISAL_LIBRARY OFF)
endif ()
if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
# Can be lld or ld-lld or lld-13 or /path/to/lld.
if (LINKER_NAME MATCHES "lld")
@ -399,9 +387,9 @@ else()
endif ()
option (ENABLE_GWP_ASAN "Enable Gwp-Asan" ON)
# We use mmap for allocations more heavily in debug builds,
# but GWP-ASan also wants to use mmap frequently,
# and due to a large number of memory mappings,
# We use mmap for allocations more heavily in debug builds,
# but GWP-ASan also wants to use mmap frequently,
# and due to a large number of memory mappings,
# it does not work together well.
if ((NOT OS_LINUX AND NOT OS_ANDROID) OR (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG"))
set(ENABLE_GWP_ASAN OFF)

View File

@ -141,20 +141,19 @@ add_contrib (libuv-cmake libuv)
add_contrib (liburing-cmake liburing)
add_contrib (amqpcpp-cmake AMQP-CPP) # requires: libuv
add_contrib (cassandra-cmake cassandra) # requires: libuv
if (ENABLE_CURL_BUILD)
if (NOT OS_DARWIN)
add_contrib (curl-cmake curl)
add_contrib (azure-cmake azure)
add_contrib (sentry-native-cmake sentry-native) # requires: curl
endif()
add_contrib (fmtlib-cmake fmtlib)
add_contrib (krb5-cmake krb5)
add_contrib (cyrus-sasl-cmake cyrus-sasl) # for krb5
add_contrib (libgsasl-cmake libgsasl) # requires krb5
add_contrib (librdkafka-cmake librdkafka) # requires: libgsasl
add_contrib (nats-io-cmake nats-io)
add_contrib (libhdfs3-cmake libhdfs3) # requires: protobuf, krb5
add_contrib (isa-l-cmake isa-l)
add_contrib (libhdfs3-cmake libhdfs3) # requires: protobuf, krb5, isa-l
add_contrib (hive-metastore-cmake hive-metastore) # requires: thrift/avro/arrow/libhdfs3
add_contrib (cppkafka-cmake cppkafka)
add_contrib (libpqxx-cmake libpqxx)
@ -178,23 +177,14 @@ add_contrib (s2geometry-cmake s2geometry)
add_contrib (c-ares-cmake c-ares)
add_contrib (qpl-cmake qpl)
add_contrib (morton-nd-cmake morton-nd)
if (ARCH_S390X)
add_contrib(crc32-s390x-cmake crc32-s390x)
endif()
add_contrib (annoy-cmake annoy)
add_contrib (xxHash-cmake xxHash)
add_contrib (google-benchmark-cmake google-benchmark)
add_contrib (ulid-c-cmake ulid-c)
if (ENABLE_ISAL_LIBRARY)
add_contrib (isa-l-cmake isa-l)
endif()
# Put all targets defined here and in subdirectories under "contrib/<immediate-subdir>" folders in GUI-based IDEs.
# Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they would not appear
# in "contrib/..." as originally planned, so we workaround this by fixing FOLDER properties of all targets manually,

View File

@ -1,3 +1,7 @@
if (NOT ARCH_AMD64)
return()
endif ()
set(ISAL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/isa-l")
# The YASM and NASM assembers are somewhat mutually compatible. ISAL specifically needs NASM. If only YASM is installed, then check_language(ASM_NASM)

View File

@ -172,7 +172,7 @@ if (TARGET OpenSSL::SSL)
target_link_libraries(_hdfs3 PRIVATE OpenSSL::Crypto OpenSSL::SSL)
endif()
if (ENABLE_ISAL_LIBRARY)
if (TARGET ch_contrib::isal)
target_link_libraries(_hdfs3 PRIVATE ch_contrib::isal)
add_definitions(-DHADOOP_ISAL_LIBRARY)
endif()

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
esac
ARG REPOSITORY="https://s3.amazonaws.com/clickhouse-builds/22.4/31c367d3cd3aefd316778601ff6565119fe36682/package_release"
ARG VERSION="23.4.1.1943"
ARG VERSION="23.4.2.11"
ARG PACKAGES="clickhouse-keeper"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.4.1.1943"
ARG VERSION="23.4.2.11"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -22,7 +22,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.4.1.1943"
ARG VERSION="23.4.2.11"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -0,0 +1,20 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.4.2.11-stable (b6442320f9d) FIXME as compared to v23.4.1.1943-stable (3920eb987f7)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Revert "Fix GCS native copy ([#48981](https://github.com/ClickHouse/ClickHouse/issues/48981))" [#49194](https://github.com/ClickHouse/ClickHouse/pull/49194) ([Raúl Marín](https://github.com/Algunenano)).
* Fix race on Outdated parts loading [#49223](https://github.com/ClickHouse/ClickHouse/pull/49223) ([Alexander Tokmakov](https://github.com/tavplubix)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Implement status comment [#48468](https://github.com/ClickHouse/ClickHouse/pull/48468) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Update curl to 8.0.1 (for CVEs) [#48765](https://github.com/ClickHouse/ClickHouse/pull/48765) ([Boris Kuschel](https://github.com/bkuschel)).
* Fallback auth gh api [#49314](https://github.com/ClickHouse/ClickHouse/pull/49314) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -439,6 +439,50 @@ Syntax: `ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions,
- `number_of_hash_functions` — The number of hash functions used in the Bloom filter.
- `random_seed` — The seed for Bloom filter hash functions.
Users can create [UDF](/docs/en/sql-reference/statements/create/function.md) to estimate the parameters set of `ngrambf_v1`. Query statements are as follows:
```sql
CREATE FUNCTION bfEstimateFunctions [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, size_of_bloom_filter_in_bits) -> round((size_of_bloom_filter_in_bits / total_nubmer_of_all_grams) * log(2));
CREATE FUNCTION bfEstimateBmSize [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, probability_of_false_positives) -> ceil((total_nubmer_of_all_grams * log(probability_of_false_positives)) / log(1 / pow(2, log(2))));
CREATE FUNCTION bfEstimateFalsePositive [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, number_of_hash_functions, size_of_bloom_filter_in_bytes) -> pow(1 - exp(-number_of_hash_functions/ (size_of_bloom_filter_in_bytes / total_nubmer_of_all_grams)), number_of_hash_functions);
CREATE FUNCTION bfEstimateGramNumber [ON CLUSTER cluster]
AS
(number_of_hash_functions, probability_of_false_positives, size_of_bloom_filter_in_bytes) -> ceil(size_of_bloom_filter_in_bytes / (-number_of_hash_functions / log(1 - exp(log(probability_of_false_positives) / number_of_hash_functions))))
```
To use those functions,we need to specify two parameter at least.
For example, if there 4300 ngrams in the granule and we expect false positives to be less than 0.0001. The other parameters can be estimated by executing following queries:
```sql
--- estimate number of bits in the filter
SELECT bfEstimateBmSize(4300, 0.0001) / 8 as size_of_bloom_filter_in_bytes;
┌─size_of_bloom_filter_in_bytes─┐
│ 10304 │
└───────────────────────────────┘
--- estimate number of hash functions
SELECT bfEstimateFunctions(4300, bfEstimateBmSize(4300, 0.0001)) as number_of_hash_functions
┌─number_of_hash_functions─┐
│ 13 │
└──────────────────────────┘
```
Of course, you can also use those functions to estimate parameters by other conditions.
The functions refer to the content [here](https://hur.st/bloomfilter).
#### Token Bloom Filter
The same as `ngrambf_v1`, but stores tokens instead of ngrams. Tokens are sequences separated by non-alphanumeric characters.

View File

@ -554,7 +554,8 @@ public:
if (capacity < size_to_reserve)
{
if (unlikely(MAX_STRING_SIZE < size_to_reserve))
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({})", size_to_reserve);
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({}), maximum: {}",
size_to_reserve, MAX_STRING_SIZE);
size_t rounded_capacity = roundUpToPowerOfTwoOrZero(size_to_reserve);
chassert(rounded_capacity <= MAX_STRING_SIZE + 1); /// rounded_capacity <= 2^31
@ -624,7 +625,8 @@ public:
void changeImpl(StringRef value, Arena * arena)
{
if (unlikely(MAX_STRING_SIZE < value.size))
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({})", value.size);
throw Exception(ErrorCodes::TOO_LARGE_STRING_SIZE, "String size is too big ({}), maximum: {}",
value.size, MAX_STRING_SIZE);
UInt32 value_size = static_cast<UInt32>(value.size);

View File

@ -50,7 +50,7 @@ struct AggregateFunctionSequenceMatchData final
bool sorted = true;
PODArrayWithStackMemory<TimestampEvents, 64> events_list;
/// sequenceMatch conditions met at least once in events_list
std::bitset<max_events> conditions_met;
Events conditions_met;
void add(const Timestamp timestamp, const Events & events)
{
@ -101,6 +101,11 @@ struct AggregateFunctionSequenceMatchData final
size_t size;
readBinary(size, buf);
/// If we lose these flags, functionality is broken
/// If we serialize/deserialize these flags, we have compatibility issues
/// If we set these flags to 1, we have a minor performance penalty, which seems acceptable
conditions_met.set();
events_list.clear();
events_list.reserve(size);

View File

@ -319,6 +319,9 @@ struct CheckRequest : virtual Request
String path;
int32_t version = -1;
/// should it check if a node DOES NOT exist
bool not_exists = false;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
@ -524,7 +527,7 @@ public:
const Requests & requests,
MultiCallback callback) = 0;
virtual DB::KeeperApiVersion getApiVersion() = 0;
virtual DB::KeeperApiVersion getApiVersion() const = 0;
/// Expire session and finish all pending requests
virtual void finalize(const String & reason) = 0;

View File

@ -91,7 +91,7 @@ public:
void finalize(const String & reason) override;
DB::KeeperApiVersion getApiVersion() override
DB::KeeperApiVersion getApiVersion() const override
{
return KeeperApiVersion::ZOOKEEPER_COMPATIBLE;
}

View File

@ -846,7 +846,7 @@ bool ZooKeeper::expired()
return impl->isExpired();
}
DB::KeeperApiVersion ZooKeeper::getApiVersion()
DB::KeeperApiVersion ZooKeeper::getApiVersion() const
{
return impl->getApiVersion();
}
@ -1307,7 +1307,6 @@ Coordination::RequestPtr makeExistsRequest(const std::string & path)
return request;
}
std::string normalizeZooKeeperPath(std::string zookeeper_path, bool check_starts_with_slash, Poco::Logger * log)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')

View File

@ -215,7 +215,7 @@ public:
/// Returns true, if the session has expired.
bool expired();
DB::KeeperApiVersion getApiVersion();
DB::KeeperApiVersion getApiVersion() const;
/// Create a znode.
/// Throw an exception if something went wrong.
@ -674,4 +674,20 @@ bool hasZooKeeperConfig(const Poco::Util::AbstractConfiguration & config);
String getZooKeeperConfigName(const Poco::Util::AbstractConfiguration & config);
template <typename Client>
void addCheckNotExistsRequest(Coordination::Requests & requests, const Client & client, const std::string & path)
{
if (client.getApiVersion() >= DB::KeeperApiVersion::WITH_CHECK_NOT_EXISTS)
{
auto request = std::make_shared<Coordination::CheckRequest>();
request->path = path;
request->not_exists = true;
requests.push_back(std::move(request));
return;
}
requests.push_back(makeCreateRequest(path, "", zkutil::CreateMode::Persistent));
requests.push_back(makeRemoveRequest(path, -1));
}
}

View File

@ -666,7 +666,15 @@ ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetResponse>()); }
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperListResponse>()); }
ZooKeeperResponsePtr ZooKeeperSimpleListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSimpleListResponse>()); }
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCheckResponse>()); }
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const
{
if (not_exists)
return setTime(std::make_shared<ZooKeeperCheckNotExistsResponse>());
return setTime(std::make_shared<ZooKeeperCheckResponse>());
}
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const
{
std::shared_ptr<ZooKeeperMultiResponse> response;
@ -931,6 +939,8 @@ void registerZooKeeperRequest(ZooKeeperRequestFactory & factory)
res->operation_type = ZooKeeperMultiRequest::OperationType::Read;
else if constexpr (num == OpNum::Multi)
res->operation_type = ZooKeeperMultiRequest::OperationType::Write;
else if constexpr (num == OpNum::CheckNotExists)
res->not_exists = true;
return res;
});
@ -956,6 +966,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::GetACL, ZooKeeperGetACLRequest>(*this);
registerZooKeeperRequest<OpNum::SetACL, ZooKeeperSetACLRequest>(*this);
registerZooKeeperRequest<OpNum::FilteredList, ZooKeeperFilteredListRequest>(*this);
registerZooKeeperRequest<OpNum::CheckNotExists, ZooKeeperCheckRequest>(*this);
}
PathMatchResult matchPath(std::string_view path, std::string_view match_to)

View File

@ -390,12 +390,12 @@ struct ZooKeeperSimpleListResponse final : ZooKeeperListResponse
size_t bytesSize() const override { return ZooKeeperListResponse::bytesSize() - sizeof(stat); }
};
struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
struct ZooKeeperCheckRequest : CheckRequest, ZooKeeperRequest
{
ZooKeeperCheckRequest() = default;
explicit ZooKeeperCheckRequest(const CheckRequest & base) : CheckRequest(base) {}
OpNum getOpNum() const override { return OpNum::Check; }
OpNum getOpNum() const override { return not_exists ? OpNum::CheckNotExists : OpNum::Check; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
@ -408,7 +408,7 @@ struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
struct ZooKeeperCheckResponse : CheckResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
@ -417,6 +417,12 @@ struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
size_t bytesSize() const override { return CheckResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperCheckNotExistsResponse : public ZooKeeperCheckResponse
{
OpNum getOpNum() const override { return OpNum::CheckNotExists; }
using ZooKeeperCheckResponse::ZooKeeperCheckResponse;
};
/// This response may be received only as an element of responses in MultiResponse.
struct ZooKeeperErrorResponse final : ErrorResponse, ZooKeeperResponse
{

View File

@ -26,6 +26,7 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::SetACL),
static_cast<int32_t>(OpNum::GetACL),
static_cast<int32_t>(OpNum::FilteredList),
static_cast<int32_t>(OpNum::CheckNotExists),
};
std::string toString(OpNum op_num)
@ -70,6 +71,8 @@ std::string toString(OpNum op_num)
return "GetACL";
case OpNum::FilteredList:
return "FilteredList";
case OpNum::CheckNotExists:
return "CheckNotExists";
}
int32_t raw_op = static_cast<int32_t>(op_num);
throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED);

View File

@ -36,6 +36,7 @@ enum class OpNum : int32_t
// CH Keeper specific operations
FilteredList = 500,
CheckNotExists = 501,
SessionID = 997, /// Special internal request
};

View File

@ -1085,7 +1085,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
}
KeeperApiVersion ZooKeeper::getApiVersion()
KeeperApiVersion ZooKeeper::getApiVersion() const
{
return keeper_api_version;
}

View File

@ -179,7 +179,7 @@ public:
const Requests & requests,
MultiCallback callback) override;
DB::KeeperApiVersion getApiVersion() override;
DB::KeeperApiVersion getApiVersion() const override;
/// Without forcefully invalidating (finalizing) ZooKeeper session before
/// establishing a new one, there was a possibility that server is using

View File

@ -6,6 +6,7 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/logger_useful.h>
#include <Common/randomSeed.h>
#include "Coordination/KeeperConstants.h"
namespace DB
{
@ -257,19 +258,22 @@ public:
Coordination::Error tryCreate(const std::string & path, const std::string & data, int32_t mode, std::string & path_created)
{
path_created.clear();
auto error = access(
"tryCreate",
path,
[&]() { return keeper->tryCreate(path, data, mode, path_created); },
[&](Coordination::Error &)
[&](Coordination::Error & code)
{
try
{
if (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral)
if (!path_created.empty() && (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral))
{
keeper->remove(path);
keeper->remove(path_created);
if (unlikely(logger))
LOG_TRACE(logger, "ZooKeeperWithFaultInjection cleanup: seed={} func={} path={}", seed, "tryCreate", path);
LOG_TRACE(logger, "ZooKeeperWithFaultInjection cleanup: seed={} func={} path={} path_created={} code={}",
seed, "tryCreate", path, path_created, code);
}
}
catch (const zkutil::KeeperException & e)
@ -277,10 +281,11 @@ public:
if (unlikely(logger))
LOG_TRACE(
logger,
"ZooKeeperWithFaultInjection cleanup FAILED: seed={} func={} path={} code={} message={} ",
"ZooKeeperWithFaultInjection cleanup FAILED: seed={} func={} path={} path_created={} code={} message={} ",
seed,
"tryCreate",
path,
path_created,
e.code,
e.message());
}
@ -289,8 +294,8 @@ public:
/// collect ephemeral nodes when no fault was injected (to clean up later)
if (unlikely(fault_policy))
{
if (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral)
ephemeral_nodes.push_back(path);
if (!path_created.empty() && (mode == zkutil::CreateMode::EphemeralSequential || mode == zkutil::CreateMode::Ephemeral))
ephemeral_nodes.push_back(path_created);
}
return error;
@ -385,6 +390,11 @@ public:
ephemeral_nodes.clear();
}
KeeperApiVersion getApiVersion() const
{
return keeper->getApiVersion();
}
private:
void faultInjectionBefore(std::function<void()> fault_cleanup)
{

View File

@ -9,10 +9,11 @@ enum class KeeperApiVersion : uint8_t
{
ZOOKEEPER_COMPATIBLE = 0,
WITH_FILTERED_LIST,
WITH_MULTI_READ
WITH_MULTI_READ,
WITH_CHECK_NOT_EXISTS,
};
inline constexpr auto current_keeper_api_version = KeeperApiVersion::WITH_MULTI_READ;
inline constexpr auto current_keeper_api_version = KeeperApiVersion::WITH_CHECK_NOT_EXISTS;
const std::string keeper_system_path = "/keeper";
const std::string keeper_api_version_path = keeper_system_path + "/api_version";

View File

@ -1449,24 +1449,39 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestProcessor
{
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
explicit KeeperStorageCheckRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_)
: KeeperStorageRequestProcessor(zk_request_)
{
return storage.checkACL(zk_request->getPath(), Coordination::ACL::Read, session_id, is_local);
check_not_exists = zk_request->getOpNum() == Coordination::OpNum::CheckNotExists;
}
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
{
auto path = zk_request->getPath();
return storage.checkACL(check_not_exists ? parentPath(path) : path, Coordination::ACL::Read, session_id, is_local);
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
ProfileEvents::increment(ProfileEvents::KeeperCheckRequest);
Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request);
if (!storage.uncommitted_state.getNode(request.path))
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
auto node = storage.uncommitted_state.getNode(request.path);
if (request.version != -1 && request.version != node->stat.version)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
if (check_not_exists)
{
if (node && (request.version == -1 || request.version == node->stat.version))
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNODEEXISTS}};
}
else
{
if (!node)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
if (request.version != -1 && request.version != node->stat.version)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
}
return {};
}
@ -1497,17 +1512,22 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
if (check_not_exists)
{
on_error(Coordination::Error::ZNONODE);
}
else if (request.version != -1 && request.version != node_it->value.stat.version)
{
on_error(Coordination::Error::ZBADVERSION);
if (node_it != container.end() && (request.version == -1 || request.version == node_it->value.stat.version))
on_error(Coordination::Error::ZNODEEXISTS);
else
response.error = Coordination::Error::ZOK;
}
else
{
response.error = Coordination::Error::ZOK;
if (node_it == container.end())
on_error(Coordination::Error::ZNONODE);
else if (request.version != -1 && request.version != node_it->value.stat.version)
on_error(Coordination::Error::ZBADVERSION);
else
response.error = Coordination::Error::ZOK;
}
return response_ptr;
@ -1523,6 +1543,9 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
ProfileEvents::increment(ProfileEvents::KeeperCheckRequest);
return processImpl<true>(storage, zxid);
}
private:
bool check_not_exists;
};
@ -1716,6 +1739,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Check:
case Coordination::OpNum::CheckNotExists:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequestProcessor>(sub_zk_request));
break;
@ -1971,6 +1995,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory()
registerKeeperRequestProcessor<Coordination::OpNum::MultiRead, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::GetACL, KeeperStorageGetACLRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::CheckNotExists, KeeperStorageCheckRequestProcessor>(*this);
}

View File

@ -2451,6 +2451,78 @@ TEST_P(CoordinationTest, ChangelogTestMaxLogSize)
}
TEST_P(CoordinationTest, TestCheckNotExistsRequest)
{
using namespace DB;
using namespace Coordination;
KeeperStorage storage{500, "", keeper_context};
int32_t zxid = 0;
const auto create_path = [&](const auto & path)
{
const auto create_request = std::make_shared<ZooKeeperCreateRequest>();
int new_zxid = ++zxid;
create_request->path = path;
storage.preprocessRequest(create_request, 1, 0, new_zxid);
auto responses = storage.processRequest(create_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path;
};
const auto check_request = std::make_shared<ZooKeeperCheckRequest>();
check_request->path = "/test_node";
check_request->not_exists = true;
{
SCOPED_TRACE("CheckNotExists returns ZOK");
int new_zxid = ++zxid;
storage.preprocessRequest(check_request, 1, 0, new_zxid);
auto responses = storage.processRequest(check_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
auto error = responses[0].response->error;
EXPECT_EQ(error, Coordination::Error::ZOK) << "CheckNotExists returned invalid result: " << errorMessage(error);
}
create_path("/test_node");
auto node_it = storage.container.find("/test_node");
ASSERT_NE(node_it, storage.container.end());
auto node_version = node_it->value.stat.version;
{
SCOPED_TRACE("CheckNotExists returns ZNODEEXISTS");
int new_zxid = ++zxid;
storage.preprocessRequest(check_request, 1, 0, new_zxid);
auto responses = storage.processRequest(check_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
auto error = responses[0].response->error;
EXPECT_EQ(error, Coordination::Error::ZNODEEXISTS) << "CheckNotExists returned invalid result: " << errorMessage(error);
}
{
SCOPED_TRACE("CheckNotExists returns ZNODEEXISTS for same version");
int new_zxid = ++zxid;
check_request->version = node_version;
storage.preprocessRequest(check_request, 1, 0, new_zxid);
auto responses = storage.processRequest(check_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
auto error = responses[0].response->error;
EXPECT_EQ(error, Coordination::Error::ZNODEEXISTS) << "CheckNotExists returned invalid result: " << errorMessage(error);
}
{
SCOPED_TRACE("CheckNotExists returns ZOK for different version");
int new_zxid = ++zxid;
check_request->version = node_version + 1;
storage.preprocessRequest(check_request, 1, 0, new_zxid);
auto responses = storage.processRequest(check_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
auto error = responses[0].response->error;
EXPECT_EQ(error, Coordination::Error::ZOK) << "CheckNotExists returned invalid result: " << errorMessage(error);
}
}
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
CoordinationTest,

View File

@ -29,11 +29,6 @@
#define DEFAULT_INSERT_BLOCK_SIZE \
1048449 /// 1048576 - PADDING_FOR_SIMD - (PADDING_FOR_SIMD - 1) bytes padding that we usually have in arrays
/** The same, but for merge operations. Less DEFAULT_BLOCK_SIZE for saving RAM (since all the columns are read).
* Significantly less, since there are 10-way mergers.
*/
#define DEFAULT_MERGE_BLOCK_SIZE 8192
#define DEFAULT_PERIODIC_LIVE_VIEW_REFRESH_SEC 60
#define SHOW_CHARS_ON_SYNTAX_ERROR ptrdiff_t(160)
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
@ -83,4 +78,3 @@
#else
#define QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS 0
#endif

View File

@ -1048,7 +1048,7 @@ void DatabaseReplicated::dropReplica(
assert(!database || database_zookeeper_path == database->zookeeper_path);
if (full_replica_name.find('/') != std::string::npos)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid replica name: {}", full_replica_name);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid replica name, '/' is not allowed: {}", full_replica_name);
auto zookeeper = Context::getGlobalContextInstance()->getZooKeeper();

View File

@ -761,6 +761,9 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::blockToAttributes(c
auto & attribute = attributes[attribute_index];
bool attribute_is_nullable = attribute.is_nullable_sets.has_value();
/// Number of elements should not take into account multiple attributes.
new_element_count = 0;
getAttributeContainers(attribute_index, [&](auto & containers)
{
using ContainerType = std::decay_t<decltype(containers.front())>;
@ -957,6 +960,15 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::calculateBytesAlloc
for (size_t attribute_index = 0; attribute_index < attributes_size; ++attribute_index)
{
/// bucket_count should be a sum over all shards (CollectionsHolder),
/// but it should not be a sum over all attributes, since it is used to
/// calculate load_factor like this:
///
/// element_count / bucket_count
///
/// While element_count is a sum over all shards, not over all attributes.
bucket_count = 0;
getAttributeContainers(attribute_index, [&](const auto & containers)
{
for (const auto & container : containers)
@ -973,12 +985,12 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::calculateBytesAlloc
/// and since this is sparsehash, empty cells should not be significant,
/// and since items cannot be removed from the dictionary, deleted is also not important.
bytes_allocated += container.size() * (sizeof(KeyType) + sizeof(AttributeValueType));
bucket_count = container.bucket_count();
bucket_count += container.bucket_count();
}
else
{
bytes_allocated += container.getBufferSizeInBytes();
bucket_count = container.getBufferSizeInCells();
bucket_count += container.getBufferSizeInCells();
}
}
});
@ -1002,12 +1014,12 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::calculateBytesAlloc
if constexpr (sparse)
{
bytes_allocated += container.size() * (sizeof(KeyType));
bucket_count = container.bucket_count();
bucket_count += container.bucket_count();
}
else
{
bytes_allocated += container.getBufferSizeInBytes();
bucket_count = container.getBufferSizeInCells();
bucket_count += container.getBufferSizeInCells();
}
}
}

View File

@ -493,11 +493,6 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
chassert(file_offset_of_buffer_end > completed_range.right);
if (read_type == ReadType::CACHED)
{
chassert(current_file_segment->getDownloadedSize(true) == current_file_segment->range().size());
}
file_segments->popFront();
if (file_segments->empty())
return false;

View File

@ -1,6 +1,6 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeInterval.h>
@ -25,7 +25,7 @@ class FunctionDateTrunc : public IFunction
public:
static constexpr auto name = "dateTrunc";
explicit FunctionDateTrunc(ContextPtr context_) : context(context_) {}
explicit FunctionDateTrunc(ContextPtr context_) : context(context_) { }
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionDateTrunc>(context); }
@ -39,51 +39,58 @@ public:
{
/// The first argument is a constant string with the name of datepart.
auto result_type_is_date = false;
intermediate_type_is_date = false;
String datepart_param;
auto check_first_argument = [&] {
auto check_first_argument = [&]
{
const ColumnConst * datepart_column = checkAndGetColumnConst<ColumnString>(arguments[0].column.get());
if (!datepart_column)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "First argument for function {} must be constant string: "
"name of datepart", getName());
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"First argument for function {} must be constant string: "
"name of datepart",
getName());
datepart_param = datepart_column->getValue<String>();
if (datepart_param.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "First argument (name of datepart) for function {} cannot be empty",
getName());
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "First argument (name of datepart) for function {} cannot be empty", getName());
if (!IntervalKind::tryParseString(datepart_param, datepart_kind))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{} doesn't look like datepart name in {}", datepart_param, getName());
result_type_is_date = (datepart_kind == IntervalKind::Year)
|| (datepart_kind == IntervalKind::Quarter) || (datepart_kind == IntervalKind::Month)
|| (datepart_kind == IntervalKind::Week);
intermediate_type_is_date = (datepart_kind == IntervalKind::Year) || (datepart_kind == IntervalKind::Quarter)
|| (datepart_kind == IntervalKind::Month) || (datepart_kind == IntervalKind::Week);
};
bool second_argument_is_date = false;
auto check_second_argument = [&] {
auto check_second_argument = [&]
{
if (!isDate(arguments[1].type) && !isDateTime(arguments[1].type) && !isDateTime64(arguments[1].type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of 2nd argument of function {}. "
"Should be a date or a date with time", arguments[1].type->getName(), getName());
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of 2nd argument of function {}. "
"Should be a date or a date with time",
arguments[1].type->getName(),
getName());
second_argument_is_date = isDate(arguments[1].type);
if (second_argument_is_date && ((datepart_kind == IntervalKind::Hour)
|| (datepart_kind == IntervalKind::Minute) || (datepart_kind == IntervalKind::Second)))
if (second_argument_is_date
&& ((datepart_kind == IntervalKind::Hour) || (datepart_kind == IntervalKind::Minute)
|| (datepart_kind == IntervalKind::Second)))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type Date of argument for function {}", getName());
};
auto check_timezone_argument = [&] {
auto check_timezone_argument = [&]
{
if (!WhichDataType(arguments[2].type).isString())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}. "
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of function {}. "
"This argument is optional and must be a constant string with timezone name",
arguments[2].type->getName(), getName());
if (second_argument_is_date && result_type_is_date)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"The timezone argument of function {} with datepart '{}' "
"is allowed only when the 2nd argument has the type DateTime",
getName(), datepart_param);
arguments[2].type->getName(),
getName());
};
if (arguments.size() == 2)
@ -99,15 +106,14 @@ public:
}
else
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be 2 or 3",
getName(), arguments.size());
getName(),
arguments.size());
}
if (result_type_is_date)
return std::make_shared<DataTypeDate>();
else
return std::make_shared<DataTypeDateTime>(extractTimeZoneNameFromFunctionArguments(arguments, 2, 1));
return std::make_shared<DataTypeDateTime>(extractTimeZoneNameFromFunctionArguments(arguments, 2, 1));
}
bool useDefaultImplementationForConstants() const override { return true; }
@ -124,26 +130,40 @@ public:
auto to_start_of_interval = FunctionFactory::instance().get("toStartOfInterval", context);
ColumnPtr truncated_column;
auto date_type = std::make_shared<DataTypeDate>();
if (arguments.size() == 2)
return to_start_of_interval->build(temp_columns)->execute(temp_columns, result_type, input_rows_count);
truncated_column = to_start_of_interval->build(temp_columns)
->execute(temp_columns, intermediate_type_is_date ? date_type : result_type, input_rows_count);
else
{
temp_columns[2] = arguments[2];
truncated_column = to_start_of_interval->build(temp_columns)
->execute(temp_columns, intermediate_type_is_date ? date_type : result_type, input_rows_count);
}
temp_columns[2] = arguments[2];
return to_start_of_interval->build(temp_columns)->execute(temp_columns, result_type, input_rows_count);
if (!intermediate_type_is_date)
return truncated_column;
ColumnsWithTypeAndName temp_truncated_column(1);
temp_truncated_column[0] = {truncated_column, date_type, ""};
auto to_date_time_or_default = FunctionFactory::instance().get("toDateTime", context);
return to_date_time_or_default->build(temp_truncated_column)->execute(temp_truncated_column, result_type, input_rows_count);
}
bool hasInformationAboutMonotonicity() const override
{
return true;
}
bool hasInformationAboutMonotonicity() const override { return true; }
Monotonicity getMonotonicityForRange(const IDataType &, const Field &, const Field &) const override
{
return { .is_monotonic = true, .is_always_monotonic = true };
return {.is_monotonic = true, .is_always_monotonic = true};
}
private:
ContextPtr context;
mutable IntervalKind::Kind datepart_kind = IntervalKind::Kind::Second;
mutable bool intermediate_type_is_date = false;
};
}

View File

@ -150,7 +150,7 @@ namespace
if (text == "bc")
throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Era BC exceeds the range of DateTime");
else if (text != "ad")
throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Unknown era {}", text);
throw Exception(ErrorCodes::CANNOT_PARSE_DATETIME, "Unknown era {} (expected 'ad' or 'bc')", text);
}
void setCentury(Int32 century)

View File

@ -33,7 +33,7 @@ size_t HTTPChunkedReadBuffer::readChunkHeader()
} while (!in->eof() && isHexDigit(*in->position()));
if (res > max_chunk_size)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Chunk size exceeded the limit");
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Chunk size exceeded the limit (max size: {})", max_chunk_size);
/// NOTE: If we want to read any chunk extensions, it should be done here.

View File

@ -33,7 +33,7 @@ String getExceptionEntryWithFileName(const ReadBuffer & in)
if (filename.empty())
return "";
return fmt::format("; While reading from: {}", filename);
return fmt::format(": While reading from: {}", filename);
}
}

View File

@ -43,7 +43,7 @@ FileSegment::FileSegment(
, key_metadata(key_metadata_)
, queue_iterator(queue_iterator_)
, cache(cache_)
#ifndef NDEBUG
#ifdef ABORT_ON_LOGICAL_ERROR
, log(&Poco::Logger::get(fmt::format("FileSegment({}) : {}", key_.toString(), range().toString())))
#else
, log(&Poco::Logger::get("FileSegment"))
@ -56,6 +56,7 @@ FileSegment::FileSegment(
/// someone will _potentially_ want to download it (after calling getOrSetDownloader()).
case (State::EMPTY):
{
chassert(key_metadata.lock());
break;
}
/// DOWNLOADED is used either on initial cache metadata load into memory on server startup
@ -65,6 +66,7 @@ FileSegment::FileSegment(
reserved_size = downloaded_size = size_;
chassert(fs::file_size(getPathInLocalCache()) == size_);
chassert(queue_iterator);
chassert(key_metadata.lock());
break;
}
case (State::DETACHED):
@ -91,8 +93,16 @@ String FileSegment::getPathInLocalCache() const
return getKeyMetadata()->getFileSegmentPath(*this);
}
void FileSegment::setDownloadState(State state, const FileSegmentGuard::Lock &)
void FileSegment::setDownloadState(State state, const FileSegmentGuard::Lock & lock)
{
if (isCompleted(false) && state != State::DETACHED)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Updating state to {} of file segment is not allowed, because it is already completed ({})",
stateToString(state), getInfoForLogUnlocked(lock));
}
LOG_TEST(log, "Updated state from {} to {}", stateToString(download_state), stateToString(state));
download_state = state;
}
@ -182,12 +192,13 @@ String FileSegment::getOrSetDownloader()
if (current_downloader.empty())
{
const auto caller_id = getCallerId();
bool allow_new_downloader = download_state == State::EMPTY || download_state == State::PARTIALLY_DOWNLOADED || !caller_id.starts_with("None");
bool allow_new_downloader = download_state == State::EMPTY || download_state == State::PARTIALLY_DOWNLOADED;
if (!allow_new_downloader)
return "notAllowed:" + stateToString(download_state);
current_downloader = downloader_id = caller_id;
setDownloadState(State::DOWNLOADING, lock);
chassert(key_metadata.lock());
}
return current_downloader;

View File

@ -278,8 +278,8 @@ void CacheMetadata::doCleanup()
}
catch (...)
{
chassert(false);
tryLogCurrentException(__PRETTY_FUNCTION__);
chassert(false);
}
}
}

View File

@ -165,6 +165,7 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
pipeline.getNumStreams(),
sort_description,
rows_in_block,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
@ -220,6 +221,7 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
pipeline.getNumStreams(),
sort_description,
rows_in_block,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
@ -254,6 +256,7 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
pipeline.getNumStreams(),
sort_description,
rows_in_block,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
@ -331,6 +334,7 @@ Block SortedBlocksBuffer::mergeBlocks(Blocks && blocks) const
builder.getNumStreams(),
sort_description,
num_rows,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Default);
builder.addTransform(std::move(transform));

View File

@ -87,6 +87,7 @@ NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{"Auth", static_cast<Int16>(Coordination::OpNum::Auth)},
{"SessionID", static_cast<Int16>(Coordination::OpNum::SessionID)},
{"FilteredList", static_cast<Int16>(Coordination::OpNum::FilteredList)},
{"CheckNotExists", static_cast<Int16>(Coordination::OpNum::CheckNotExists)},
});
auto error_enum = getCoordinationErrorCodesEnumType();

View File

@ -1429,10 +1429,12 @@ bool ParserAlias::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!allow_alias_without_as_keyword && !has_as_word)
return false;
bool is_quoted = pos->type == TokenType::QuotedIdentifier;
if (!id_p.parse(pos, node, expected))
return false;
if (!has_as_word)
if (!has_as_word && !is_quoted)
{
/** In this case, the alias can not match the keyword -
* so that in the query "SELECT x FROM t", the word FROM was not considered an alias,

View File

@ -13,14 +13,18 @@ class AggregatingSortedTransform final : public IMergingTransform<AggregatingSor
{
public:
AggregatingSortedTransform(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size)
const Block & header,
size_t num_inputs,
SortDescription description_,
size_t max_block_size_rows,
size_t max_block_size_bytes)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),
max_block_size)
max_block_size_rows,
max_block_size_bytes)
{
}

View File

@ -159,8 +159,11 @@ AggregatingSortedAlgorithm::SimpleAggregateDescription::~SimpleAggregateDescript
AggregatingSortedAlgorithm::AggregatingMergedData::AggregatingMergedData(
MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_)
: MergedData(std::move(columns_), false, max_block_size_), def(def_)
MutableColumns columns_,
UInt64 max_block_size_rows_,
UInt64 max_block_size_bytes_,
ColumnsDefinition & def_)
: MergedData(std::move(columns_), false, max_block_size_rows_, max_block_size_bytes_), def(def_)
{
initAggregateDescription();
@ -257,10 +260,14 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::initAggregateDescription
AggregatingSortedAlgorithm::AggregatingSortedAlgorithm(
const Block & header_, size_t num_inputs, SortDescription description_, size_t max_block_size)
const Block & header_,
size_t num_inputs,
SortDescription description_,
size_t max_block_size_rows_,
size_t max_block_size_bytes_)
: IMergingAlgorithmWithDelayedChunk(header_, num_inputs, description_)
, columns_definition(defineColumns(header_, description_))
, merged_data(getMergedColumns(header_, columns_definition), max_block_size, columns_definition)
, merged_data(getMergedColumns(header_, columns_definition), max_block_size_rows_, max_block_size_bytes_, columns_definition)
{
}

View File

@ -18,8 +18,11 @@ class AggregatingSortedAlgorithm final : public IMergingAlgorithmWithDelayedChun
{
public:
AggregatingSortedAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size);
const Block & header,
size_t num_inputs,
SortDescription description_,
size_t max_block_size_rows_,
size_t max_block_size_bytes_);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
@ -96,7 +99,11 @@ private:
using MergedData::insertRow;
public:
AggregatingMergedData(MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_);
AggregatingMergedData(
MutableColumns columns_,
UInt64 max_block_size_rows_,
UInt64 max_block_size_bytes_,
ColumnsDefinition & def_);
/// Group is a group of rows with the same sorting key. It represents single row in result.
/// Algorithm is: start group, add several rows, finish group.

View File

@ -26,12 +26,13 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
SortDescription description_,
const String & sign_column,
bool only_positive_sign_,
size_t max_block_size,
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
Poco::Logger * log_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_)
, sign_column_number(header_.getPositionByName(sign_column))
, only_positive_sign(only_positive_sign_)
, log(log_)

View File

@ -32,7 +32,8 @@ public:
SortDescription description_,
const String & sign_column,
bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0.
size_t max_block_size,
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
Poco::Logger * log_,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
@ -74,4 +75,3 @@ private:
};
}

View File

@ -30,9 +30,9 @@ FinishAggregatingInOrderAlgorithm::FinishAggregatingInOrderAlgorithm(
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
const SortDescription & description_,
size_t max_block_size_,
size_t max_block_bytes_)
: header(header_), num_inputs(num_inputs_), params(params_), max_block_size(max_block_size_), max_block_bytes(max_block_bytes_)
size_t max_block_size_rows_,
size_t max_block_size_bytes_)
: header(header_), num_inputs(num_inputs_), params(params_), max_block_size_rows(max_block_size_rows_), max_block_size_bytes(max_block_size_bytes_)
{
for (const auto & column_description : description_)
description.emplace_back(column_description, header_.getPositionByName(column_description.column_name));
@ -118,7 +118,7 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
inputs_to_update.pop_back();
/// Do not merge blocks, if there are too few rows or bytes.
if (accumulated_rows >= max_block_size || accumulated_bytes >= max_block_bytes)
if (accumulated_rows >= max_block_size_rows || accumulated_bytes >= max_block_size_bytes)
status.chunk = prepareToMerge();
return status;

View File

@ -42,8 +42,8 @@ public:
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
const SortDescription & description_,
size_t max_block_size_,
size_t max_block_bytes_);
size_t max_block_size_rows_,
size_t max_block_size_bytes_);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
@ -79,8 +79,8 @@ private:
size_t num_inputs;
AggregatingTransformParamsPtr params;
SortDescriptionWithPositions description;
size_t max_block_size;
size_t max_block_bytes;
size_t max_block_size_rows;
size_t max_block_size_bytes;
Inputs current_inputs;

View File

@ -42,11 +42,12 @@ GraphiteRollupSortedAlgorithm::GraphiteRollupSortedAlgorithm(
const Block & header_,
size_t num_inputs,
SortDescription description_,
size_t max_block_size,
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
Graphite::Params params_,
time_t time_of_merge_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), nullptr, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), false, max_block_size)
, merged_data(header_.cloneEmptyColumns(), false, max_block_size_rows_, max_block_size_bytes_)
, params(std::move(params_))
, time_of_merge(time_of_merge_)
{

View File

@ -22,9 +22,13 @@ class GraphiteRollupSortedAlgorithm final : public IMergingAlgorithmWithSharedCh
{
public:
GraphiteRollupSortedAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_);
const Block & header,
size_t num_inputs,
SortDescription description_,
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
Graphite::Params params_,
time_t time_of_merge_);
Status merge() override;

View File

@ -19,8 +19,8 @@ namespace ErrorCodes
class MergedData
{
public:
explicit MergedData(MutableColumns columns_, bool use_average_block_size_, UInt64 max_block_size_)
: columns(std::move(columns_)), max_block_size(max_block_size_), use_average_block_size(use_average_block_size_)
explicit MergedData(MutableColumns columns_, bool use_average_block_size_, UInt64 max_block_size_, UInt64 max_block_size_bytes_)
: columns(std::move(columns_)), max_block_size(max_block_size_), max_block_size_bytes(max_block_size_bytes_), use_average_block_size(use_average_block_size_)
{
}
@ -117,6 +117,16 @@ public:
if (merged_rows >= max_block_size)
return true;
/// Never return more than max_block_size_bytes
if (max_block_size_bytes)
{
size_t merged_bytes = 0;
for (const auto & column : columns)
merged_bytes += column->allocatedBytes();
if (merged_bytes >= max_block_size_bytes)
return true;
}
if (!use_average_block_size)
return false;
@ -143,8 +153,9 @@ protected:
UInt64 total_chunks = 0;
UInt64 total_allocated_bytes = 0;
const UInt64 max_block_size;
const bool use_average_block_size;
const UInt64 max_block_size = 0;
const UInt64 max_block_size_bytes = 0;
const bool use_average_block_size = false;
bool need_flush = false;
};

View File

@ -11,13 +11,14 @@ MergingSortedAlgorithm::MergingSortedAlgorithm(
Block header_,
size_t num_inputs,
const SortDescription & description_,
size_t max_block_size,
size_t max_block_size_,
size_t max_block_size_bytes_,
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: header(std::move(header_))
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size_, max_block_size_bytes_)
, description(description_)
, limit(limit_)
, out_row_sources_buf(out_row_sources_buf_)

View File

@ -17,7 +17,8 @@ public:
Block header_,
size_t num_inputs,
const SortDescription & description_,
size_t max_block_size,
size_t max_block_size_,
size_t max_block_size_bytes_,
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_ = 0,
WriteBuffer * out_row_sources_buf_ = nullptr,

View File

@ -17,12 +17,13 @@ ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
SortDescription description_,
const String & is_deleted_column,
const String & version_column,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
bool cleanup_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size), cleanup(cleanup_)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows, max_block_size_bytes), cleanup(cleanup_)
{
if (!is_deleted_column.empty())
is_deleted_column_number = header_.getPositionByName(is_deleted_column);

View File

@ -23,7 +23,8 @@ public:
SortDescription description_,
const String & is_deleted_column,
const String & version_column,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false);

View File

@ -497,8 +497,8 @@ static void setRow(Row & row, const ColumnRawPtrs & raw_columns, size_t row_num,
SummingSortedAlgorithm::SummingMergedData::SummingMergedData(
MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_)
: MergedData(std::move(columns_), false, max_block_size_)
MutableColumns columns_, UInt64 max_block_size_rows_, UInt64 max_block_size_bytes_, ColumnsDefinition & def_)
: MergedData(std::move(columns_), false, max_block_size_rows_, max_block_size_bytes_)
, def(def_)
{
current_row.resize(def.column_names.size());
@ -686,10 +686,11 @@ SummingSortedAlgorithm::SummingSortedAlgorithm(
SortDescription description_,
const Names & column_names_to_sum,
const Names & partition_key_columns,
size_t max_block_size)
size_t max_block_size_rows,
size_t max_block_size_bytes)
: IMergingAlgorithmWithDelayedChunk(header_, num_inputs, std::move(description_))
, columns_definition(defineColumns(header_, description, column_names_to_sum, partition_key_columns))
, merged_data(getMergedDataColumns(header_, columns_definition), max_block_size, columns_definition)
, merged_data(getMergedDataColumns(header_, columns_definition), max_block_size_rows, max_block_size_bytes, columns_definition)
{
}

View File

@ -22,7 +22,8 @@ public:
const Names & column_names_to_sum,
/// List of partition key columns. They have to be excluded.
const Names & partition_key_columns,
size_t max_block_size);
size_t max_block_size_rows,
size_t max_block_size_bytes);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
@ -63,7 +64,7 @@ public:
using MergedData::insertRow;
public:
SummingMergedData(MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_);
SummingMergedData(MutableColumns columns_, UInt64 max_block_size_rows, UInt64 max_block_size_bytes_, ColumnsDefinition & def_);
void startGroup(ColumnRawPtrs & raw_columns, size_t row);
void finishGroup();

View File

@ -12,13 +12,14 @@ VersionedCollapsingAlgorithm::VersionedCollapsingAlgorithm(
size_t num_inputs,
SortDescription description_,
const String & sign_column_,
size_t max_block_size,
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, MAX_ROWS_IN_MULTIVERSION_QUEUE)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_)
/// -1 for +1 in FixedSizeDequeWithGaps's internal buffer. 3 is a reasonable minimum size to collapse anything.
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 1)
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_rows_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 1)
, current_keys(max_rows_in_queue)
{
sign_column_number = header_.getPositionByName(sign_column_);

View File

@ -20,7 +20,8 @@ public:
VersionedCollapsingAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_, const String & sign_column_,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);

View File

@ -16,7 +16,8 @@ public:
SortDescription description_,
const String & sign_column,
bool only_positive_sign,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
@ -26,7 +27,8 @@ public:
std::move(description_),
sign_column,
only_positive_sign,
max_block_size,
max_block_size_rows,
max_block_size_bytes,
&Poco::Logger::get("CollapsingSortedTransform"),
out_row_sources_buf_,
use_average_block_sizes)

View File

@ -17,16 +17,16 @@ public:
size_t num_inputs,
AggregatingTransformParamsPtr params,
SortDescription description,
size_t max_block_size,
size_t max_block_bytes)
size_t max_block_size_rows,
size_t max_block_size_bytes)
: IMergingTransform(
num_inputs, header, {}, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
params,
std::move(description),
max_block_size,
max_block_bytes)
max_block_size_rows,
max_block_size_bytes)
{
}

View File

@ -11,15 +11,20 @@ class GraphiteRollupSortedTransform final : public IMergingTransform<GraphiteRol
{
public:
GraphiteRollupSortedTransform(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_)
const Block & header,
size_t num_inputs,
SortDescription description_,
size_t max_block_size_rows,
size_t max_block_size_bytes,
Graphite::Params params_,
time_t time_of_merge_)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),
max_block_size,
max_block_size_rows,
max_block_size_bytes,
std::move(params_),
time_of_merge_)
{

View File

@ -11,7 +11,8 @@ MergingSortedTransform::MergingSortedTransform(
const Block & header,
size_t num_inputs,
const SortDescription & description_,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_,
bool always_read_till_end_,
@ -29,7 +30,8 @@ MergingSortedTransform::MergingSortedTransform(
header,
num_inputs,
description_,
max_block_size,
max_block_size_rows,
max_block_size_bytes,
sorting_queue_strategy,
limit_,
out_row_sources_buf_,

View File

@ -15,7 +15,8 @@ public:
const Block & header,
size_t num_inputs,
const SortDescription & description,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_ = 0,
bool always_read_till_end_ = false,

View File

@ -15,7 +15,8 @@ public:
const Block & header, size_t num_inputs,
SortDescription description_,
const String & is_deleted_column, const String & version_column,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false)
@ -26,7 +27,8 @@ public:
std::move(description_),
is_deleted_column,
version_column,
max_block_size,
max_block_size_rows,
max_block_size_bytes,
out_row_sources_buf_,
use_average_block_sizes,
cleanup)

View File

@ -17,7 +17,9 @@ public:
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
const Names & partition_key_columns,
size_t max_block_size)
size_t max_block_size_rows,
size_t max_block_size_bytes
)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
@ -25,7 +27,8 @@ public:
std::move(description_),
column_names_to_sum,
partition_key_columns,
max_block_size)
max_block_size_rows,
max_block_size_bytes)
{
}

View File

@ -15,7 +15,8 @@ public:
VersionedCollapsingTransform(
const Block & header, size_t num_inputs,
SortDescription description_, const String & sign_column_,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
@ -24,7 +25,8 @@ public:
num_inputs,
std::move(description_),
sign_column_,
max_block_size,
max_block_size_rows,
max_block_size_bytes,
out_row_sources_buf_,
use_average_block_sizes)
{

View File

@ -6,12 +6,14 @@
#include <Interpreters/getColumnFromBlock.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Storages/StorageSnapshot.h>
#include <Storages/StorageMemory.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/ISource.h>
#include <Processors/Sources/NullSource.h>
namespace DB
{
@ -93,29 +95,39 @@ private:
InitializerFunc initializer_func;
};
ReadFromMemoryStorageStep::ReadFromMemoryStorageStep(Pipe pipe_) :
SourceStepWithFilter(DataStream{.header = pipe_.getHeader()}),
pipe(std::move(pipe_))
ReadFromMemoryStorageStep::ReadFromMemoryStorageStep(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
const size_t num_streams_,
const bool delay_read_for_global_sub_queries_) :
SourceStepWithFilter(DataStream{.header=storage_snapshot_->getSampleBlockForColumns(columns_to_read_)}),
columns_to_read(columns_to_read_),
storage_snapshot(storage_snapshot_),
num_streams(num_streams_),
delay_read_for_global_sub_queries(delay_read_for_global_sub_queries_)
{
}
void ReadFromMemoryStorageStep::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
// use move - make sure that the call will only be made once.
auto pipe = makePipe();
if (pipe.empty())
{
assert(output_stream != std::nullopt);
pipe = Pipe(std::make_shared<NullSource>(output_stream->header));
}
pipeline.init(std::move(pipe));
}
Pipe ReadFromMemoryStorageStep::makePipe(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
const bool delay_read_for_global_sub_queries_)
Pipe ReadFromMemoryStorageStep::makePipe()
{
storage_snapshot_->check(columns_to_read_);
storage_snapshot->check(columns_to_read);
const auto & snapshot_data = assert_cast<const StorageMemory::SnapshotData &>(*storage_snapshot_->data);
const auto & snapshot_data = assert_cast<const StorageMemory::SnapshotData &>(*storage_snapshot->data);
auto current_data = snapshot_data.blocks;
if (delay_read_for_global_sub_queries_)
if (delay_read_for_global_sub_queries)
{
/// Note: for global subquery we use single source.
/// Mainly, the reason is that at this point table is empty,
@ -126,8 +138,8 @@ Pipe ReadFromMemoryStorageStep::makePipe(const Names & columns_to_read_,
/// Since no other manipulation with data is done, multiple sources shouldn't give any profit.
return Pipe(std::make_shared<MemorySource>(
columns_to_read_,
storage_snapshot_,
columns_to_read,
storage_snapshot,
nullptr /* data */,
nullptr /* parallel execution index */,
[current_data](std::shared_ptr<const Blocks> & data_to_initialize)
@ -138,16 +150,16 @@ Pipe ReadFromMemoryStorageStep::makePipe(const Names & columns_to_read_,
size_t size = current_data->size();
if (num_streams_ > size)
num_streams_ = size;
if (num_streams > size)
num_streams = size;
Pipes pipes;
auto parallel_execution_index = std::make_shared<std::atomic<size_t>>(0);
for (size_t stream = 0; stream < num_streams_; ++stream)
for (size_t stream = 0; stream < num_streams; ++stream)
{
pipes.emplace_back(std::make_shared<MemorySource>(columns_to_read_, storage_snapshot_, current_data, parallel_execution_index));
pipes.emplace_back(std::make_shared<MemorySource>(columns_to_read, storage_snapshot, current_data, parallel_execution_index));
}
return Pipe::unitePipes(std::move(pipes));
}

View File

@ -5,6 +5,7 @@
#include <Interpreters/TreeRewriter.h>
#include <Processors/QueryPlan/SourceStepWithFilter.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/SelectQueryInfo.h>
namespace DB
{
@ -14,7 +15,10 @@ class QueryPipelineBuilder;
class ReadFromMemoryStorageStep final : public SourceStepWithFilter
{
public:
explicit ReadFromMemoryStorageStep(Pipe pipe_);
ReadFromMemoryStorageStep(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
bool delay_read_for_global_sub_queries_);
ReadFromMemoryStorageStep() = delete;
ReadFromMemoryStorageStep(const ReadFromMemoryStorageStep &) = delete;
@ -27,14 +31,15 @@ public:
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
static Pipe makePipe(const Names & columns_to_read_,
const StorageSnapshotPtr & storage_snapshot_,
size_t num_streams_,
bool delay_read_for_global_sub_queries_);
private:
static constexpr auto name = "ReadFromMemoryStorage";
Pipe pipe;
Names columns_to_read;
StorageSnapshotPtr storage_snapshot;
size_t num_streams;
bool delay_read_for_global_sub_queries;
Pipe makePipe();
};
}

View File

@ -852,7 +852,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (pipe.numOutputPorts() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size, SortingQueueStrategy::Batch);
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch);
pipe.addTransform(std::move(transform));
}
@ -898,31 +898,31 @@ static void addMergingFinal(
{
case MergeTreeData::MergingParams::Ordinary:
return std::make_shared<MergingSortedTransform>(header, num_outputs,
sort_description, max_block_size, SortingQueueStrategy::Batch);
sort_description, max_block_size, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(header, num_outputs,
sort_description, merging_params.sign_column, true, max_block_size);
sort_description, merging_params.sign_column, true, max_block_size, /*max_block_size_bytes=*/0);
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(header, num_outputs,
sort_description, merging_params.columns_to_sum, partition_key_columns, max_block_size);
sort_description, merging_params.columns_to_sum, partition_key_columns, max_block_size, /*max_block_size_bytes=*/0);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, num_outputs,
sort_description, max_block_size);
sort_description, max_block_size, /*max_block_size_bytes=*/0);
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(header, num_outputs,
sort_description, merging_params.is_deleted_column, merging_params.version_column, max_block_size, /*out_row_sources_buf_*/ nullptr, /*use_average_block_sizes*/ false, /*cleanup*/ !merging_params.is_deleted_column.empty());
sort_description, merging_params.is_deleted_column, merging_params.version_column, max_block_size, /*max_block_size_bytes=*/0, /*out_row_sources_buf_*/ nullptr, /*use_average_block_sizes*/ false, /*cleanup*/ !merging_params.is_deleted_column.empty());
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, num_outputs,
sort_description, merging_params.sign_column, max_block_size);
sort_description, merging_params.sign_column, max_block_size, /*max_block_size_bytes=*/0);
case MergeTreeData::MergingParams::Graphite:
return std::make_shared<GraphiteRollupSortedTransform>(header, num_outputs,
sort_description, max_block_size, merging_params.graphite_params, now);
sort_description, max_block_size, /*max_block_size_bytes=*/0, merging_params.graphite_params, now);
}
UNREACHABLE();

View File

@ -176,6 +176,7 @@ void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescr
pipeline.getNumStreams(),
result_sort_desc,
sort_settings.max_block_size,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Batch,
limit_,
always_read_till_end);
@ -269,6 +270,7 @@ void SortingStep::fullSort(
pipeline.getNumStreams(),
result_sort_desc,
sort_settings.max_block_size,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Batch,
limit_,
always_read_till_end);

View File

@ -186,6 +186,7 @@ void MergeSortingTransform::consume(Chunk chunk)
0,
description,
max_merged_block_size,
/*max_merged_block_size_bytes*/0,
SortingQueueStrategy::Batch,
limit,
/*always_read_till_end_=*/ false,

View File

@ -2012,7 +2012,7 @@ struct WindowFunctionNtile final : public WindowFunction
if (!buckets)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ntile's argument must > 0");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "ntile's argument must be greater than 0");
}
}
// new partition

View File

@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform));
@ -125,7 +125,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform));

View File

@ -1090,7 +1090,11 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
"in a single ALTER query", backQuote(column_name));
if (command.codec)
{
if (all_columns.hasAlias(column_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot specify codec for column type ALIAS");
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
}
auto column_default = all_columns.getDefault(column_name);
if (column_default)
{

View File

@ -659,6 +659,12 @@ bool ColumnsDescription::hasPhysical(const String & column_name) const
it->default_desc.kind != ColumnDefaultKind::Alias && it->default_desc.kind != ColumnDefaultKind::Ephemeral;
}
bool ColumnsDescription::hasAlias(const String & column_name) const
{
auto it = columns.get<1>().find(column_name);
return it != columns.get<1>().end() && it->default_desc.kind == ColumnDefaultKind::Alias;
}
bool ColumnsDescription::hasColumnOrSubcolumn(GetColumnsOptions::Kind kind, const String & column_name) const
{
auto it = columns.get<1>().find(column_name);

View File

@ -177,6 +177,7 @@ public:
Names getNamesOfPhysical() const;
bool hasPhysical(const String & column_name) const;
bool hasAlias(const String & column_name) const;
bool hasColumnOrSubcolumn(GetColumnsOptions::Kind kind, const String & column_name) const;
bool hasColumnOrNested(GetColumnsOptions::Kind kind, const String & column_name) const;

View File

@ -24,7 +24,7 @@ template <typename T>
std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const T & deduplication_path)
{
constexpr bool async_insert = std::is_same_v<T, std::vector<String>>;
static constexpr bool async_insert = std::is_same_v<T, std::vector<String>>;
String path;
@ -42,16 +42,15 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
if constexpr (async_insert)
{
for (const auto & single_dedup_path : deduplication_path)
{
ops.emplace_back(zkutil::makeCreateRequest(single_dedup_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(single_dedup_path, -1));
}
zkutil::addCheckNotExistsRequest(ops, *zookeeper_, single_dedup_path);
}
else
{
ops.emplace_back(zkutil::makeCreateRequest(deduplication_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1));
zkutil::addCheckNotExistsRequest(ops, *zookeeper_, deduplication_path);
}
auto deduplication_path_ops_size = ops.size();
ops.emplace_back(zkutil::makeCreateRequest(path_prefix_, holder_path, zkutil::CreateMode::EphemeralSequential));
Coordination::Responses responses;
Coordination::Error e = zookeeper_->tryMulti(ops, responses);
@ -60,9 +59,10 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
if constexpr (async_insert)
{
auto failed_idx = zkutil::getFailedOpIndex(Coordination::Error::ZNODEEXISTS, responses);
if (failed_idx < deduplication_path.size() * 2)
if (failed_idx < deduplication_path_ops_size)
{
const String & failed_op_path = deduplication_path[failed_idx / 2];
const String & failed_op_path = ops[failed_idx]->getPath();
LOG_DEBUG(
&Poco::Logger::get("createEphemeralLockInZooKeeper"),
"Deduplication path already exists: deduplication_path={}",

View File

@ -31,6 +31,7 @@ MergeListElement::MergeListElement(
source_part_paths.emplace_back(source_part->getDataPartStorage().getFullPath());
total_size_bytes_compressed += source_part->getBytesOnDisk();
total_size_bytes_uncompressed += source_part->getTotalColumnsSize().data_uncompressed;
total_size_marks += source_part->getMarksCount();
total_rows_count += source_part->index_granularity.getTotalRows();
}
@ -57,6 +58,7 @@ MergeInfo MergeListElement::getInfo() const
res.progress = progress.load(std::memory_order_relaxed);
res.num_parts = num_parts;
res.total_size_bytes_compressed = total_size_bytes_compressed;
res.total_size_bytes_uncompressed = total_size_bytes_uncompressed;
res.total_size_marks = total_size_marks;
res.total_rows_count = total_rows_count;
res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed);

View File

@ -40,6 +40,7 @@ struct MergeInfo
Float64 progress;
UInt64 num_parts;
UInt64 total_size_bytes_compressed;
UInt64 total_size_bytes_uncompressed;
UInt64 total_size_marks;
UInt64 total_rows_count;
UInt64 bytes_read_uncompressed;
@ -82,6 +83,7 @@ struct MergeListElement : boost::noncopyable
std::atomic<bool> is_cancelled{};
UInt64 total_size_bytes_compressed{};
UInt64 total_size_bytes_uncompressed{};
UInt64 total_size_marks{};
UInt64 total_rows_count{};
std::atomic<UInt64> bytes_read_uncompressed{};

View File

@ -921,7 +921,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
/// If merge is vertical we cannot calculate it
ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
UInt64 merge_block_size = data_settings->merge_max_block_size;
/// There is no sense to have the block size bigger than one granule for merge operations.
const UInt64 merge_block_size_rows = data_settings->merge_max_block_size;
const UInt64 merge_block_size_bytes = data_settings->merge_max_block_size_bytes;
switch (ctx->merging_params.mode)
{
@ -930,7 +932,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
header,
pipes.size(),
sort_description,
merge_block_size,
merge_block_size_rows,
merge_block_size_bytes,
SortingQueueStrategy::Default,
/* limit_= */0,
/* always_read_till_end_= */false,
@ -942,35 +945,35 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
case MergeTreeData::MergingParams::Collapsing:
merged_transform = std::make_shared<CollapsingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.sign_column, false,
merge_block_size, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Summing:
merged_transform = std::make_shared<SummingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.columns_to_sum, partition_key_columns, merge_block_size);
header, pipes.size(), sort_description, ctx->merging_params.columns_to_sum, partition_key_columns, merge_block_size_rows, merge_block_size_bytes);
break;
case MergeTreeData::MergingParams::Aggregating:
merged_transform = std::make_shared<AggregatingSortedTransform>(header, pipes.size(), sort_description, merge_block_size);
merged_transform = std::make_shared<AggregatingSortedTransform>(header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes);
break;
case MergeTreeData::MergingParams::Replacing:
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column,
merge_block_size, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
(data_settings->clean_deleted_rows != CleanDeletedRows::Never) || global_ctx->cleanup);
break;
case MergeTreeData::MergingParams::Graphite:
merged_transform = std::make_shared<GraphiteRollupSortedTransform>(
header, pipes.size(), sort_description, merge_block_size,
header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes,
ctx->merging_params.graphite_params, global_ctx->time_of_merge);
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
merged_transform = std::make_shared<VersionedCollapsingTransform>(
header, pipes.size(), sort_description, ctx->merging_params.sign_column,
merge_block_size, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
break;
}
@ -1011,7 +1014,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm() const
{
const size_t sum_rows_upper_bound = global_ctx->merge_list_element_ptr->total_rows_count;
const size_t total_rows_count = global_ctx->merge_list_element_ptr->total_rows_count;
const size_t total_size_bytes_uncompressed = global_ctx->merge_list_element_ptr->total_size_bytes_uncompressed;
const auto data_settings = global_ctx->data->getSettings();
if (global_ctx->deduplicate)
@ -1042,11 +1046,13 @@ MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm
bool enough_ordinary_cols = global_ctx->gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate;
bool enough_total_rows = sum_rows_upper_bound >= data_settings->vertical_merge_algorithm_min_rows_to_activate;
bool enough_total_rows = total_rows_count >= data_settings->vertical_merge_algorithm_min_rows_to_activate;
bool enough_total_bytes = total_size_bytes_uncompressed >= data_settings->vertical_merge_algorithm_min_bytes_to_activate;
bool no_parts_overflow = global_ctx->future_part->parts.size() <= RowSourcePart::MAX_PARTS;
auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ?
auto merge_alg = (is_supported_storage && enough_total_rows && enough_total_bytes && enough_ordinary_cols && no_parts_overflow) ?
MergeAlgorithm::Vertical : MergeAlgorithm::Horizontal;
return merge_alg;

View File

@ -280,23 +280,23 @@ Block MergeTreeDataWriter::mergeBlock(
return nullptr;
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedAlgorithm>(
block, 1, sort_description, merging_params.is_deleted_column, merging_params.version_column, block_size + 1);
block, 1, sort_description, merging_params.is_deleted_column, merging_params.version_column, block_size + 1, /*block_size_bytes=*/0);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedAlgorithm>(
block, 1, sort_description, merging_params.sign_column,
false, block_size + 1, &Poco::Logger::get("MergeTreeDataWriter"));
false, block_size + 1, /*block_size_bytes=*/0, &Poco::Logger::get("MergeTreeDataWriter"));
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedAlgorithm>(
block, 1, sort_description, merging_params.columns_to_sum,
partition_key_columns, block_size + 1);
partition_key_columns, block_size + 1, /*block_size_bytes=*/0);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedAlgorithm>(block, 1, sort_description, block_size + 1);
return std::make_shared<AggregatingSortedAlgorithm>(block, 1, sort_description, block_size + 1, /*block_size_bytes=*/0);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingAlgorithm>(
block, 1, sort_description, merging_params.sign_column, block_size + 1);
block, 1, sort_description, merging_params.sign_column, block_size + 1, /*block_size_bytes=*/0);
case MergeTreeData::MergingParams::Graphite:
return std::make_shared<GraphiteRollupSortedAlgorithm>(
block, 1, sort_description, block_size + 1, merging_params.graphite_params, time(nullptr));
block, 1, sort_description, block_size + 1, /*block_size_bytes=*/0, merging_params.graphite_params, time(nullptr));
}
UNREACHABLE();

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeIndexReader.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
namespace
{
@ -20,7 +21,7 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
return std::make_unique<MergeTreeReaderStream>(
part->getDataPartStoragePtr(),
std::make_shared<LoadedMergeTreeDataPartInfoForReader>(part),
index->getFileName(), extension, marks_count,
all_mark_ranges,
std::move(settings), mark_cache, uncompressed_cache,

View File

@ -30,7 +30,7 @@ namespace ErrorCodes
}
MergeTreeMarksLoader::MergeTreeMarksLoader(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
MarkCache * mark_cache_,
const String & mrk_path_,
size_t marks_count_,
@ -39,7 +39,7 @@ MergeTreeMarksLoader::MergeTreeMarksLoader(
const ReadSettings & read_settings_,
ThreadPool * load_marks_threadpool_,
size_t columns_in_mark_)
: data_part_storage(std::move(data_part_storage_))
: data_part_reader(data_part_reader_)
, mark_cache(mark_cache_)
, mrk_path(mrk_path_)
, marks_count(marks_count_)
@ -98,6 +98,8 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
auto data_part_storage = data_part_reader->getDataPartStorage();
size_t file_size = data_part_storage->getFileSize(mrk_path);
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
size_t expected_uncompressed_size = mark_size * marks_count;
@ -177,6 +179,8 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
{
MarkCache::MappedPtr loaded_marks;
auto data_part_storage = data_part_reader->getDataPartStorage();
if (mark_cache)
{
auto key = mark_cache->hash(fs::path(data_part_storage->getFullPath()) / mrk_path);

View File

@ -1,9 +1,9 @@
#pragma once
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/MarkCache.h>
#include <IO/ReadSettings.h>
#include <Common/ThreadPool_fwd.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
namespace DB
@ -18,7 +18,7 @@ public:
using MarksPtr = MarkCache::MappedPtr;
MergeTreeMarksLoader(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
MarkCache * mark_cache_,
const String & mrk_path,
size_t marks_count_,
@ -33,7 +33,7 @@ public:
MarkInCompressedFile getMark(size_t row_index, size_t column_index = 0);
private:
DataPartStoragePtr data_part_storage;
MergeTreeDataPartInfoForReaderPtr data_part_reader;
MarkCache * mark_cache = nullptr;
String mrk_path;
size_t marks_count;

View File

@ -36,7 +36,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
settings_,
avg_value_size_hints_)
, marks_loader(
data_part_info_for_read_->getDataPartStorage(),
data_part_info_for_read_,
mark_cache,
data_part_info_for_read_->getIndexGranularityInfo().getMarksFilePath(MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part_info_for_read_->getMarksCount(),

View File

@ -15,7 +15,7 @@ namespace ErrorCodes
}
MergeTreeReaderStream::MergeTreeReaderStream(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
const String & path_prefix_,
const String & data_file_extension_,
size_t marks_count_,
@ -35,7 +35,7 @@ MergeTreeReaderStream::MergeTreeReaderStream(
, all_mark_ranges(all_mark_ranges_)
, file_size(file_size_)
, uncompressed_cache(uncompressed_cache_)
, data_part_storage(std::move(data_part_storage_))
, data_part_storage(data_part_reader_->getDataPartStorage())
, path_prefix(path_prefix_)
, data_file_extension(data_file_extension_)
, is_low_cardinality_dictionary(is_low_cardinality_dictionary_)
@ -44,7 +44,7 @@ MergeTreeReaderStream::MergeTreeReaderStream(
, save_marks_in_cache(settings.save_marks_in_cache)
, index_granularity_info(index_granularity_info_)
, marks_loader(
data_part_storage,
data_part_reader_,
mark_cache,
index_granularity_info->getMarksFilePath(path_prefix),
marks_count,

View File

@ -9,6 +9,7 @@
#include <Compression/CompressedReadBufferFromFile.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
namespace DB
@ -19,7 +20,7 @@ class MergeTreeReaderStream
{
public:
MergeTreeReaderStream(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
const String & path_prefix_,
const String & data_file_extension_,
size_t marks_count_,

View File

@ -242,7 +242,7 @@ void MergeTreeReaderWide::addStreams(
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
data_part_info_for_read->getDataPartStorage(), stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read, stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part_info_for_read->getIndexGranularityInfo(),

View File

@ -40,7 +40,8 @@ struct Settings;
M(Float, ratio_of_defaults_for_sparse_serialization, 1.0, "Minimal ratio of number of default values to number of all values in column to store it in sparse serializations. If >= 1, columns will be always written in full serialization.", 0) \
\
/** Merge settings. */ \
M(UInt64, merge_max_block_size, DEFAULT_MERGE_BLOCK_SIZE, "How many rows in blocks should be formed for merge operations.", 0) \
M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \
M(UInt64, merge_max_block_size_bytes, 10 * 1024 * 1024, "How many bytes in blocks should be formed for merge operations. By default has the same value as `index_granularity_bytes`.", 0) \
M(UInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024, "Maximum in total size of parts to merge, when there are maximum free threads in background pool (or entries in replication queue).", 0) \
M(UInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024, "Maximum in total size of parts to merge, when there are minimum free threads in background pool (or entries in replication queue).", 0) \
M(UInt64, max_replicated_merges_in_queue, 1000, "How many tasks of merging and mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \
@ -126,7 +127,8 @@ struct Settings;
M(UInt64, min_relative_delay_to_close, 300, "Minimal delay from other replicas to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, min_absolute_delay_to_close, 0, "Minimal absolute delay to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, enable_vertical_merge_algorithm, 1, "Enable usage of Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * DEFAULT_MERGE_BLOCK_SIZE, "Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * 8192, "Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_bytes_to_activate, 0, "Minimal (approximate) uncompressed size in bytes in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_columns_to_activate, 11, "Minimal amount of non-PK columns to activate Vertical merge algorithm.", 0) \
\
/** Compatibility settings */ \

View File

@ -144,7 +144,8 @@ StorageSnapshotPtr StorageMemory::getStorageSnapshot(const StorageMetadataPtr &
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, object_columns, std::move(snapshot_data));
}
Pipe StorageMemory::read(
void StorageMemory::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /*query_info*/,
@ -153,29 +154,7 @@ Pipe StorageMemory::read(
size_t /*max_block_size*/,
size_t num_streams)
{
return ReadFromMemoryStorageStep::makePipe(column_names, storage_snapshot, num_streams, delay_read_for_global_subqueries);
}
void StorageMemory::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams)
{
// @TODO it looks like IStorage::readFromPipe. different only step's type.
auto pipe = read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
if (pipe.empty())
{
auto header = storage_snapshot->getSampleBlockForColumns(column_names);
InterpreterSelectQuery::addEmptySourceToQueryPlan(query_plan, header, query_info, context);
return;
}
auto read_step = std::make_unique<ReadFromMemoryStorageStep>(std::move(pipe));
query_plan.addStep(std::move(read_step));
query_plan.addStep(std::make_unique<ReadFromMemoryStorageStep>(column_names, storage_snapshot, num_streams, delay_read_for_global_subqueries));
}

View File

@ -45,15 +45,6 @@ public:
StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override;
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
void read(
QueryPlan & query_plan,
const Names & column_names,

View File

@ -2480,8 +2480,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
{
/// We check that it was not suddenly upgraded to new version.
/// Otherwise it can be upgraded and instantly become lost, but we cannot notice that.
ops.push_back(zkutil::makeCreateRequest(fs::path(source_path) / "is_lost", "0", zkutil::CreateMode::Persistent));
ops.push_back(zkutil::makeRemoveRequest(fs::path(source_path) / "is_lost", -1));
zkutil::addCheckNotExistsRequest(ops, *zookeeper, fs::path(source_path) / "is_lost");
}
else /// The replica we clone should not suddenly become lost.
ops.push_back(zkutil::makeCheckRequest(fs::path(source_path) / "is_lost", source_is_lost_stat.version));
@ -8941,8 +8940,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
/// We must be sure that this part doesn't exist on other replicas
if (!zookeeper->exists(current_part_path))
{
ops.emplace_back(zkutil::makeCreateRequest(current_part_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(current_part_path, -1));
zkutil::addCheckNotExistsRequest(ops, *zookeeper, current_part_path);
}
else
{

View File

@ -22,6 +22,7 @@ NamesAndTypesList StorageSystemMerges::getNamesAndTypes()
{"partition_id", std::make_shared<DataTypeString>()},
{"is_mutation", std::make_shared<DataTypeUInt8>()},
{"total_size_bytes_compressed", std::make_shared<DataTypeUInt64>()},
{"total_size_bytes_uncompressed", std::make_shared<DataTypeUInt64>()},
{"total_size_marks", std::make_shared<DataTypeUInt64>()},
{"bytes_read_uncompressed", std::make_shared<DataTypeUInt64>()},
{"rows_read", std::make_shared<DataTypeUInt64>()},
@ -59,6 +60,7 @@ void StorageSystemMerges::fillData(MutableColumns & res_columns, ContextPtr cont
res_columns[i++]->insert(merge.partition_id);
res_columns[i++]->insert(merge.is_mutation);
res_columns[i++]->insert(merge.total_size_bytes_compressed);
res_columns[i++]->insert(merge.total_size_bytes_uncompressed);
res_columns[i++]->insert(merge.total_size_marks);
res_columns[i++]->insert(merge.bytes_read_uncompressed);
res_columns[i++]->insert(merge.rows_read);

View File

@ -123,7 +123,6 @@
02713_array_low_cardinality_string
02707_skip_index_with_in
02707_complex_query_fails_analyzer
02680_mysql_ast_logical_err
02324_map_combinator_bug
02241_join_rocksdb_bs
02003_WithMergeableStateAfterAggregationAndLimit_LIMIT_BY_LIMIT_OFFSET
@ -136,4 +135,4 @@
00261_storage_aliases_and_array_join
02701_non_parametric_function
01825_type_json_multiple_files
01281_group_by_limit_memory_tracking
01281_group_by_limit_memory_tracking

View File

@ -10,8 +10,8 @@ from typing import Any, Callable, List
import requests # type: ignore
import get_robot_token as grt # we need an updated ROBOT_TOKEN
from ci_config import CI_CONFIG
from get_robot_token import ROBOT_TOKEN, get_best_robot_token
DOWNLOAD_RETRIES_COUNT = 5
@ -56,13 +56,18 @@ def get_gh_api(
def set_auth_header():
if "headers" in kwargs:
if "Authorization" not in kwargs["headers"]:
kwargs["headers"]["Authorization"] = f"Bearer {get_best_robot_token()}"
kwargs["headers"][
"Authorization"
] = f"Bearer {grt.get_best_robot_token()}"
else:
kwargs["headers"] = {"Authorization": f"Bearer {get_best_robot_token()}"}
kwargs["headers"] = {
"Authorization": f"Bearer {grt.get_best_robot_token()}"
}
if ROBOT_TOKEN is not None:
if grt.ROBOT_TOKEN is not None:
set_auth_header()
need_retry = False
for _ in range(retries):
try:
response = get_with_retries(url, 1, sleep, **kwargs)
@ -78,9 +83,11 @@ def get_gh_api(
"Received rate limit exception, setting the auth header and retry"
)
set_auth_header()
need_retry = True
break
return get_with_retries(url, retries, sleep, **kwargs)
if need_retry:
return get_with_retries(url, retries, sleep, **kwargs)
def get_build_name_for_check(check_name: str) -> str:

View File

@ -417,7 +417,7 @@ CHECK_DESCRIPTIONS = [
"AST fuzzer",
"Runs randomly generated queries to catch program errors. "
"The build type is optionally given in parenthesis. "
"If it fails, ask a maintainer for help.",
"If it fails, ask a maintainer for help",
lambda x: x.startswith("AST fuzzer"),
),
CheckDescription(
@ -439,13 +439,13 @@ CHECK_DESCRIPTIONS = [
"information to fix the error, but you might have to reproduce the failure "
"locally. The <b>cmake</b> options can be found in the build log, grepping for "
'<b>cmake</b>. Use these options and follow the <a href="'
'https://clickhouse.com/docs/en/development/build">general build process</a>.',
'https://clickhouse.com/docs/en/development/build">general build process</a>',
lambda x: x.startswith("ClickHouse") and x.endswith("build check"),
),
CheckDescription(
"Compatibility check",
"Checks that <b>clickhouse</b> binary runs on distributions with old libc "
"versions. If it fails, ask a maintainer for help.",
"versions. If it fails, ask a maintainer for help",
lambda x: x.startswith("Compatibility check"),
),
CheckDescription(
@ -465,12 +465,18 @@ CHECK_DESCRIPTIONS = [
"omitting some. If it fails, further checks are not started until it is fixed. "
"Look at the report to see which tests fail, then reproduce the failure "
'locally as described <a href="https://clickhouse.com/docs/en/development/'
'tests#functional-test-locally">here</a>.',
'tests#functional-test-locally">here</a>',
lambda x: x == "Fast test",
),
CheckDescription(
"Flaky tests",
"Checks if new added or modified tests are flaky by running them repeatedly, in parallel, with more randomization. Functional tests are run 100 times with address sanitizer, and additional randomization of thread scheduling. Integrational tests are run up to 10 times. If at least once a new test has failed, or was too long, this check will be red. We don't allow flaky tests, read https://clickhouse.com/blog/decorating-a-christmas-tree-with-the-help-of-flaky-tests/",
"Checks if new added or modified tests are flaky by running them repeatedly, "
"in parallel, with more randomization. Functional tests are run 100 times "
"with address sanitizer, and additional randomization of thread scheduling. "
"Integrational tests are run up to 10 times. If at least once a new test has "
"failed, or was too long, this check will be red. We don't allow flaky tests, "
'read <a href="https://clickhouse.com/blog/decorating-a-christmas-tree-with-'
'the-help-of-flaky-tests/">the doc</a>',
lambda x: "tests flaky check" in x,
),
CheckDescription(
@ -506,37 +512,37 @@ CHECK_DESCRIPTIONS = [
"Sqllogic",
"Run clickhouse on the "
'<a href="https://www.sqlite.org/sqllogictest">sqllogic</a> '
"test set against sqlite and checks that all statements are passed.",
"test set against sqlite and checks that all statements are passed",
lambda x: x.startswith("Sqllogic test"),
),
CheckDescription(
"SQLancer",
"Fuzzing tests that detect logical bugs with "
'<a href="https://github.com/sqlancer/sqlancer">SQLancer</a> tool.',
'<a href="https://github.com/sqlancer/sqlancer">SQLancer</a> tool',
lambda x: x.startswith("SQLancer"),
),
CheckDescription(
"Stateful tests",
"Runs stateful functional tests for ClickHouse binaries built in various "
"configurations -- release, debug, with sanitizers, etc.",
"configurations -- release, debug, with sanitizers, etc",
lambda x: x.startswith("Stateful tests ("),
),
CheckDescription(
"Stateless tests",
"Runs stateless functional tests for ClickHouse binaries built in various "
"configurations -- release, debug, with sanitizers, etc.",
"configurations -- release, debug, with sanitizers, etc",
lambda x: x.startswith("Stateless tests ("),
),
CheckDescription(
"Stress test",
"Runs stateless functional tests concurrently from several clients to detect "
"concurrency-related errors.",
"concurrency-related errors",
lambda x: x.startswith("Stress test ("),
),
CheckDescription(
"Style Check",
"Runs a set of checks to keep the code style clean. If some of tests failed, "
"see the related log from the report.",
"see the related log from the report",
lambda x: x == "Style Check",
),
CheckDescription(
@ -548,7 +554,7 @@ CHECK_DESCRIPTIONS = [
"Upgrade check",
"Runs stress tests on server version from last release and then tries to "
"upgrade it to the version from the PR. It checks if the new server can "
"successfully startup without any errors, crashes or sanitizer asserts.",
"successfully startup without any errors, crashes or sanitizer asserts",
lambda x: x.startswith("Upgrade check ("),
),
CheckDescription(

View File

@ -101,7 +101,21 @@ def post_commit_status(
raise ex
time.sleep(i)
if pr_info:
set_status_comment(commit, pr_info)
status_updated = False
for i in range(RETRY):
try:
set_status_comment(commit, pr_info)
status_updated = True
break
except Exception as ex:
logging.warning(
"Failed to update the status commit, will retry %s times: %s",
RETRY - i - 1,
ex,
)
if not status_updated:
logging.error("Failed to update the status comment, continue anyway")
def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
@ -116,6 +130,18 @@ def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
if not statuses:
return
if not [status for status in statuses if status.context == CI_STATUS_NAME]:
# This is the case, when some statuses already exist for the check,
# but not the CI_STATUS_NAME. We should create it as pending.
# W/o pr_info to avoid recursion, and yes, one extra create_ci_report
post_commit_status(
commit,
"pending",
create_ci_report(pr_info, statuses),
"The report for running CI",
CI_STATUS_NAME,
)
# We update the report in generate_status_comment function, so do it each
# run, even in the release PRs and normal pushes
comment_body = generate_status_comment(pr_info, statuses)

View File

@ -2117,7 +2117,14 @@ def reportLogStats(args):
'Column ''{}'' already exists', 'No macro {} in config', 'Invalid origin H3 index: {}',
'Invalid session timeout: ''{}''', 'Tuple cannot be empty', 'Database name is empty',
'Table {} is not a Dictionary', 'Expected function, got: {}', 'Unknown identifier: ''{}''',
'Failed to {} input ''{}''', '{}.{} is not a VIEW', 'Cannot convert NULL to {}', 'Dictionary {} doesn''t exist'
'Failed to {} input ''{}''', '{}.{} is not a VIEW', 'Cannot convert NULL to {}', 'Dictionary {} doesn''t exist',
'Write file: {}', 'Unable to parse JSONPath', 'Host is empty in S3 URI.', 'Expected end of line',
'inflate failed: {}{}', 'Center is not valid', 'Column ''{}'' is ambiguous', 'Cannot parse object', 'Invalid date: {}',
'There is no cache by name: {}', 'No part {} in table', '`{}` should be a String', 'There are duplicate id {}',
'Invalid replica name: {}', 'Unexpected value {} in enum', 'Unknown BSON type: {}', 'Point is not valid',
'Invalid qualified name: {}', 'INTO OUTFILE is not allowed', 'Arguments must not be NaN', 'Cell is not valid',
'brotli decode error{}', 'Invalid H3 index: {}', 'Too large node state size', 'No additional keys found.',
'Attempt to read after EOF.', 'Replication was stopped', '{} building file infos', 'Cannot parse uuid {}'
) AS known_short_messages
SELECT count() AS c, message_format_string, substr(any(message), 1, 120)
FROM system.text_log

View File

@ -2,7 +2,7 @@ runtime messages 0.001
runtime exceptions 0.05
messages shorter than 10 1
messages shorter than 16 3
exceptions shorter than 30 30
exceptions shorter than 30 3
noisy messages 0.3
noisy Trace messages 0.16
noisy Debug messages 0.09

View File

@ -49,7 +49,14 @@ create temporary table known_short_messages (s String) as select * from (select
'Column ''{}'' already exists', 'No macro {} in config', 'Invalid origin H3 index: {}',
'Invalid session timeout: ''{}''', 'Tuple cannot be empty', 'Database name is empty',
'Table {} is not a Dictionary', 'Expected function, got: {}', 'Unknown identifier: ''{}''',
'Failed to {} input ''{}''', '{}.{} is not a VIEW', 'Cannot convert NULL to {}', 'Dictionary {} doesn''t exist'
'Failed to {} input ''{}''', '{}.{} is not a VIEW', 'Cannot convert NULL to {}', 'Dictionary {} doesn''t exist',
'Write file: {}', 'Unable to parse JSONPath', 'Host is empty in S3 URI.', 'Expected end of line',
'inflate failed: {}{}', 'Center is not valid', 'Column ''{}'' is ambiguous', 'Cannot parse object', 'Invalid date: {}',
'There is no cache by name: {}', 'No part {} in table', '`{}` should be a String', 'There are duplicate id {}',
'Invalid replica name: {}', 'Unexpected value {} in enum', 'Unknown BSON type: {}', 'Point is not valid',
'Invalid qualified name: {}', 'INTO OUTFILE is not allowed', 'Arguments must not be NaN', 'Cell is not valid',
'brotli decode error{}', 'Invalid H3 index: {}', 'Too large node state size', 'No additional keys found.',
'Attempt to read after EOF.', 'Replication was stopped', '{} building file infos', 'Cannot parse uuid {}'
] as arr) array join arr;
-- Check that we don't have too many short meaningless message patterns.
@ -59,7 +66,7 @@ select 'messages shorter than 10', max2(countDistinctOrDefault(message_format_st
select 'messages shorter than 16', max2(countDistinctOrDefault(message_format_string), 3) from logs where length(message_format_string) < 16 and message_format_string not in known_short_messages;
-- Same as above, but exceptions must be more informative. Feel free to update the threshold or remove this query if really necessary
select 'exceptions shorter than 30', max2(countDistinctOrDefault(message_format_string), 30) from logs where length(message_format_string) < 30 and message ilike '%DB::Exception%' and message_format_string not in known_short_messages;
select 'exceptions shorter than 30', max2(countDistinctOrDefault(message_format_string), 3) from logs where length(message_format_string) < 30 and message ilike '%DB::Exception%' and message_format_string not in known_short_messages;
-- Avoid too noisy messages: top 1 message frequency must be less than 30%. We should reduce the threshold
@ -98,7 +105,9 @@ select 'incorrect patterns', max2(countDistinct(message_format_string), 15) from
where ((rand() % 8) = 0)
and message not like (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') as s)
and message not like (s || ' (skipped % similar messages)')
and message not like ('%Exception: '||s||'%') group by message_format_string
and message not like ('%Exception: '||s||'%')
and message not like ('%(skipped % similar messages)%')
group by message_format_string
) where any_message not like '%Poco::Exception%';
drop table logs;

View File

@ -246,18 +246,18 @@ toUnixTimestamp
1426415400
1426415400
date_trunc
2019-01-01
2020-01-01
2020-01-01
2019-10-01
2020-01-01
2020-01-01
2019-12-01
2020-01-01
2020-01-01
2019-12-30
2019-12-30
2019-12-30
2019-01-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-10-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-12-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-12-30 00:00:00
2019-12-30 00:00:00
2019-12-30 00:00:00
2019-12-31 00:00:00
2020-01-01 00:00:00
2020-01-02 00:00:00
@ -270,18 +270,18 @@ date_trunc
2019-12-31 20:11:22
2020-01-01 12:11:22
2020-01-02 05:11:22
2019-01-01
2020-01-01
2020-01-01
2019-10-01
2020-01-01
2020-01-01
2019-12-01
2020-01-01
2020-01-01
2019-12-30
2019-12-30
2019-12-30
2019-01-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-10-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-12-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-12-30 00:00:00
2019-12-30 00:00:00
2019-12-30 00:00:00
2019-12-31 00:00:00
2020-01-01 00:00:00
2020-01-02 00:00:00
@ -294,8 +294,8 @@ date_trunc
2019-12-31 20:11:22
2020-01-01 12:11:22
2020-01-02 05:11:22
2020-01-01
2020-01-01
2020-01-01
2019-12-30
2020-01-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-12-30 00:00:00
2020-01-01 00:00:00

View File

@ -5,7 +5,7 @@
-1275 -424.99999983 -255 -1275 -424.99999983 -255
101 101 101 101 101 101
-101 -101 -101 -101 -101 -101
(101,101,101) (101,101,101) (101,101,101) (101,101,101) (102,100,101)
(101,101,101) (101,101,101) (101,101,101) (101,101,101) (1,1,1,1,1,1)
5 5 5
10 10 10
-50 -50 -16.66666666 -16.66666666 -10 -10

Some files were not shown because too many files have changed in this diff Show More