Merge branch 'master' into keeper-log-improvements

This commit is contained in:
Antonio Andelic 2023-05-03 08:31:21 +00:00
commit 78ecfac8d5
105 changed files with 749 additions and 378 deletions

View File

@ -170,18 +170,6 @@ else ()
set(NO_WHOLE_ARCHIVE --no-whole-archive)
endif ()
option(ENABLE_CURL_BUILD "Enable curl, azure, sentry build on by default except MacOS." ON)
if (OS_DARWIN)
# Disable the curl, azure, senry build on MacOS
set (ENABLE_CURL_BUILD OFF)
endif ()
option(ENABLE_ISAL_LIBRARY "Enable ISA-L library ON by default except on aarch64." ON)
if (ARCH_AARCH64)
# Disable ISA-L libray on aarch64.
set (ENABLE_ISAL_LIBRARY OFF)
endif ()
if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
# Can be lld or ld-lld or lld-13 or /path/to/lld.
if (LINKER_NAME MATCHES "lld")
@ -399,9 +387,9 @@ else()
endif ()
option (ENABLE_GWP_ASAN "Enable Gwp-Asan" ON)
# We use mmap for allocations more heavily in debug builds,
# but GWP-ASan also wants to use mmap frequently,
# and due to a large number of memory mappings,
# We use mmap for allocations more heavily in debug builds,
# but GWP-ASan also wants to use mmap frequently,
# and due to a large number of memory mappings,
# it does not work together well.
if ((NOT OS_LINUX AND NOT OS_ANDROID) OR (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG"))
set(ENABLE_GWP_ASAN OFF)

View File

@ -141,20 +141,19 @@ add_contrib (libuv-cmake libuv)
add_contrib (liburing-cmake liburing)
add_contrib (amqpcpp-cmake AMQP-CPP) # requires: libuv
add_contrib (cassandra-cmake cassandra) # requires: libuv
if (ENABLE_CURL_BUILD)
if (NOT OS_DARWIN)
add_contrib (curl-cmake curl)
add_contrib (azure-cmake azure)
add_contrib (sentry-native-cmake sentry-native) # requires: curl
endif()
add_contrib (fmtlib-cmake fmtlib)
add_contrib (krb5-cmake krb5)
add_contrib (cyrus-sasl-cmake cyrus-sasl) # for krb5
add_contrib (libgsasl-cmake libgsasl) # requires krb5
add_contrib (librdkafka-cmake librdkafka) # requires: libgsasl
add_contrib (nats-io-cmake nats-io)
add_contrib (libhdfs3-cmake libhdfs3) # requires: protobuf, krb5
add_contrib (isa-l-cmake isa-l)
add_contrib (libhdfs3-cmake libhdfs3) # requires: protobuf, krb5, isa-l
add_contrib (hive-metastore-cmake hive-metastore) # requires: thrift/avro/arrow/libhdfs3
add_contrib (cppkafka-cmake cppkafka)
add_contrib (libpqxx-cmake libpqxx)
@ -178,23 +177,14 @@ add_contrib (s2geometry-cmake s2geometry)
add_contrib (c-ares-cmake c-ares)
add_contrib (qpl-cmake qpl)
add_contrib (morton-nd-cmake morton-nd)
if (ARCH_S390X)
add_contrib(crc32-s390x-cmake crc32-s390x)
endif()
add_contrib (annoy-cmake annoy)
add_contrib (xxHash-cmake xxHash)
add_contrib (google-benchmark-cmake google-benchmark)
add_contrib (ulid-c-cmake ulid-c)
if (ENABLE_ISAL_LIBRARY)
add_contrib (isa-l-cmake isa-l)
endif()
# Put all targets defined here and in subdirectories under "contrib/<immediate-subdir>" folders in GUI-based IDEs.
# Some of third-party projects may override CMAKE_FOLDER or FOLDER property of their targets, so they would not appear
# in "contrib/..." as originally planned, so we workaround this by fixing FOLDER properties of all targets manually,

View File

@ -1,3 +1,7 @@
if (NOT ARCH_AMD64)
return()
endif ()
set(ISAL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/isa-l")
# The YASM and NASM assembers are somewhat mutually compatible. ISAL specifically needs NASM. If only YASM is installed, then check_language(ASM_NASM)

View File

@ -172,7 +172,7 @@ if (TARGET OpenSSL::SSL)
target_link_libraries(_hdfs3 PRIVATE OpenSSL::Crypto OpenSSL::SSL)
endif()
if (ENABLE_ISAL_LIBRARY)
if (TARGET ch_contrib::isal)
target_link_libraries(_hdfs3 PRIVATE ch_contrib::isal)
add_definitions(-DHADOOP_ISAL_LIBRARY)
endif()

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
esac
ARG REPOSITORY="https://s3.amazonaws.com/clickhouse-builds/22.4/31c367d3cd3aefd316778601ff6565119fe36682/package_release"
ARG VERSION="23.4.1.1943"
ARG VERSION="23.4.2.11"
ARG PACKAGES="clickhouse-keeper"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.4.1.1943"
ARG VERSION="23.4.2.11"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -22,7 +22,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.4.1.1943"
ARG VERSION="23.4.2.11"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -0,0 +1,20 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.4.2.11-stable (b6442320f9d) FIXME as compared to v23.4.1.1943-stable (3920eb987f7)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Revert "Fix GCS native copy ([#48981](https://github.com/ClickHouse/ClickHouse/issues/48981))" [#49194](https://github.com/ClickHouse/ClickHouse/pull/49194) ([Raúl Marín](https://github.com/Algunenano)).
* Fix race on Outdated parts loading [#49223](https://github.com/ClickHouse/ClickHouse/pull/49223) ([Alexander Tokmakov](https://github.com/tavplubix)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Implement status comment [#48468](https://github.com/ClickHouse/ClickHouse/pull/48468) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Update curl to 8.0.1 (for CVEs) [#48765](https://github.com/ClickHouse/ClickHouse/pull/48765) ([Boris Kuschel](https://github.com/bkuschel)).
* Fallback auth gh api [#49314](https://github.com/ClickHouse/ClickHouse/pull/49314) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -50,7 +50,7 @@ struct AggregateFunctionSequenceMatchData final
bool sorted = true;
PODArrayWithStackMemory<TimestampEvents, 64> events_list;
/// sequenceMatch conditions met at least once in events_list
std::bitset<max_events> conditions_met;
Events conditions_met;
void add(const Timestamp timestamp, const Events & events)
{
@ -101,6 +101,11 @@ struct AggregateFunctionSequenceMatchData final
size_t size;
readBinary(size, buf);
/// If we lose these flags, functionality is broken
/// If we serialize/deserialize these flags, we have compatibility issues
/// If we set these flags to 1, we have a minor performance penalty, which seems acceptable
conditions_met.set();
events_list.clear();
events_list.reserve(size);

View File

@ -319,6 +319,9 @@ struct CheckRequest : virtual Request
String path;
int32_t version = -1;
/// should it check if a node DOES NOT exist
bool not_exists = false;
void addRootPath(const String & root_path) override;
String getPath() const override { return path; }
@ -524,7 +527,7 @@ public:
const Requests & requests,
MultiCallback callback) = 0;
virtual DB::KeeperApiVersion getApiVersion() = 0;
virtual DB::KeeperApiVersion getApiVersion() const = 0;
/// Expire session and finish all pending requests
virtual void finalize(const String & reason) = 0;

View File

@ -91,7 +91,7 @@ public:
void finalize(const String & reason) override;
DB::KeeperApiVersion getApiVersion() override
DB::KeeperApiVersion getApiVersion() const override
{
return KeeperApiVersion::ZOOKEEPER_COMPATIBLE;
}

View File

@ -821,7 +821,7 @@ bool ZooKeeper::expired()
return impl->isExpired();
}
DB::KeeperApiVersion ZooKeeper::getApiVersion()
DB::KeeperApiVersion ZooKeeper::getApiVersion() const
{
return impl->getApiVersion();
}
@ -1282,7 +1282,6 @@ Coordination::RequestPtr makeExistsRequest(const std::string & path)
return request;
}
std::string normalizeZooKeeperPath(std::string zookeeper_path, bool check_starts_with_slash, Poco::Logger * log)
{
if (!zookeeper_path.empty() && zookeeper_path.back() == '/')

View File

@ -215,7 +215,7 @@ public:
/// Returns true, if the session has expired.
bool expired();
DB::KeeperApiVersion getApiVersion();
DB::KeeperApiVersion getApiVersion() const;
/// Create a znode.
/// Throw an exception if something went wrong.
@ -674,4 +674,20 @@ bool hasZooKeeperConfig(const Poco::Util::AbstractConfiguration & config);
String getZooKeeperConfigName(const Poco::Util::AbstractConfiguration & config);
template <typename Client>
void addCheckNotExistsRequest(Coordination::Requests & requests, const Client & client, const std::string & path)
{
if (client.getApiVersion() >= DB::KeeperApiVersion::WITH_CHECK_NOT_EXISTS)
{
auto request = std::make_shared<Coordination::CheckRequest>();
request->path = path;
request->not_exists = true;
requests.push_back(std::move(request));
return;
}
requests.push_back(makeCreateRequest(path, "", zkutil::CreateMode::Persistent));
requests.push_back(makeRemoveRequest(path, -1));
}
}

View File

@ -666,7 +666,15 @@ ZooKeeperResponsePtr ZooKeeperGetRequest::makeResponse() const { return setTime(
ZooKeeperResponsePtr ZooKeeperSetRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSetResponse>()); }
ZooKeeperResponsePtr ZooKeeperListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperListResponse>()); }
ZooKeeperResponsePtr ZooKeeperSimpleListRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperSimpleListResponse>()); }
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const { return setTime(std::make_shared<ZooKeeperCheckResponse>()); }
ZooKeeperResponsePtr ZooKeeperCheckRequest::makeResponse() const
{
if (not_exists)
return setTime(std::make_shared<ZooKeeperCheckNotExistsResponse>());
return setTime(std::make_shared<ZooKeeperCheckResponse>());
}
ZooKeeperResponsePtr ZooKeeperMultiRequest::makeResponse() const
{
std::shared_ptr<ZooKeeperMultiResponse> response;
@ -931,6 +939,8 @@ void registerZooKeeperRequest(ZooKeeperRequestFactory & factory)
res->operation_type = ZooKeeperMultiRequest::OperationType::Read;
else if constexpr (num == OpNum::Multi)
res->operation_type = ZooKeeperMultiRequest::OperationType::Write;
else if constexpr (num == OpNum::CheckNotExists)
res->not_exists = true;
return res;
});
@ -956,6 +966,7 @@ ZooKeeperRequestFactory::ZooKeeperRequestFactory()
registerZooKeeperRequest<OpNum::GetACL, ZooKeeperGetACLRequest>(*this);
registerZooKeeperRequest<OpNum::SetACL, ZooKeeperSetACLRequest>(*this);
registerZooKeeperRequest<OpNum::FilteredList, ZooKeeperFilteredListRequest>(*this);
registerZooKeeperRequest<OpNum::CheckNotExists, ZooKeeperCheckRequest>(*this);
}
PathMatchResult matchPath(std::string_view path, std::string_view match_to)

View File

@ -390,12 +390,12 @@ struct ZooKeeperSimpleListResponse final : ZooKeeperListResponse
size_t bytesSize() const override { return ZooKeeperListResponse::bytesSize() - sizeof(stat); }
};
struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
struct ZooKeeperCheckRequest : CheckRequest, ZooKeeperRequest
{
ZooKeeperCheckRequest() = default;
explicit ZooKeeperCheckRequest(const CheckRequest & base) : CheckRequest(base) {}
OpNum getOpNum() const override { return OpNum::Check; }
OpNum getOpNum() const override { return not_exists ? OpNum::CheckNotExists : OpNum::Check; }
void writeImpl(WriteBuffer & out) const override;
void readImpl(ReadBuffer & in) override;
std::string toStringImpl() const override;
@ -408,7 +408,7 @@ struct ZooKeeperCheckRequest final : CheckRequest, ZooKeeperRequest
void createLogElements(LogElements & elems) const override;
};
struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
struct ZooKeeperCheckResponse : CheckResponse, ZooKeeperResponse
{
void readImpl(ReadBuffer &) override {}
void writeImpl(WriteBuffer &) const override {}
@ -417,6 +417,12 @@ struct ZooKeeperCheckResponse final : CheckResponse, ZooKeeperResponse
size_t bytesSize() const override { return CheckResponse::bytesSize() + sizeof(xid) + sizeof(zxid); }
};
struct ZooKeeperCheckNotExistsResponse : public ZooKeeperCheckResponse
{
OpNum getOpNum() const override { return OpNum::CheckNotExists; }
using ZooKeeperCheckResponse::ZooKeeperCheckResponse;
};
/// This response may be received only as an element of responses in MultiResponse.
struct ZooKeeperErrorResponse final : ErrorResponse, ZooKeeperResponse
{

View File

@ -26,6 +26,7 @@ static const std::unordered_set<int32_t> VALID_OPERATIONS =
static_cast<int32_t>(OpNum::SetACL),
static_cast<int32_t>(OpNum::GetACL),
static_cast<int32_t>(OpNum::FilteredList),
static_cast<int32_t>(OpNum::CheckNotExists),
};
std::string toString(OpNum op_num)
@ -70,6 +71,8 @@ std::string toString(OpNum op_num)
return "GetACL";
case OpNum::FilteredList:
return "FilteredList";
case OpNum::CheckNotExists:
return "CheckNotExists";
}
int32_t raw_op = static_cast<int32_t>(op_num);
throw Exception("Operation " + std::to_string(raw_op) + " is unknown", Error::ZUNIMPLEMENTED);

View File

@ -36,6 +36,7 @@ enum class OpNum : int32_t
// CH Keeper specific operations
FilteredList = 500,
CheckNotExists = 501,
SessionID = 997, /// Special internal request
};

View File

@ -1085,7 +1085,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
ProfileEvents::increment(ProfileEvents::ZooKeeperTransactions);
}
KeeperApiVersion ZooKeeper::getApiVersion()
KeeperApiVersion ZooKeeper::getApiVersion() const
{
return keeper_api_version;
}

View File

@ -179,7 +179,7 @@ public:
const Requests & requests,
MultiCallback callback) override;
DB::KeeperApiVersion getApiVersion() override;
DB::KeeperApiVersion getApiVersion() const override;
/// Without forcefully invalidating (finalizing) ZooKeeper session before
/// establishing a new one, there was a possibility that server is using

View File

@ -6,6 +6,7 @@
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/logger_useful.h>
#include <Common/randomSeed.h>
#include "Coordination/KeeperConstants.h"
namespace DB
{
@ -381,6 +382,11 @@ public:
ephemeral_nodes.clear();
}
KeeperApiVersion getApiVersion() const
{
return keeper->getApiVersion();
}
private:
void faultInjectionBefore(std::function<void()> fault_cleanup)
{

View File

@ -9,10 +9,11 @@ enum class KeeperApiVersion : uint8_t
{
ZOOKEEPER_COMPATIBLE = 0,
WITH_FILTERED_LIST,
WITH_MULTI_READ
WITH_MULTI_READ,
WITH_CHECK_NOT_EXISTS,
};
inline constexpr auto current_keeper_api_version = KeeperApiVersion::WITH_MULTI_READ;
inline constexpr auto current_keeper_api_version = KeeperApiVersion::WITH_CHECK_NOT_EXISTS;
const std::string keeper_system_path = "/keeper";
const std::string keeper_api_version_path = keeper_system_path + "/api_version";

View File

@ -1449,24 +1449,39 @@ struct KeeperStorageListRequestProcessor final : public KeeperStorageRequestProc
struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestProcessor
{
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
explicit KeeperStorageCheckRequestProcessor(const Coordination::ZooKeeperRequestPtr & zk_request_)
: KeeperStorageRequestProcessor(zk_request_)
{
return storage.checkACL(zk_request->getPath(), Coordination::ACL::Read, session_id, is_local);
check_not_exists = zk_request->getOpNum() == Coordination::OpNum::CheckNotExists;
}
bool checkAuth(KeeperStorage & storage, int64_t session_id, bool is_local) const override
{
auto path = zk_request->getPath();
return storage.checkACL(check_not_exists ? parentPath(path) : path, Coordination::ACL::Read, session_id, is_local);
}
using KeeperStorageRequestProcessor::KeeperStorageRequestProcessor;
std::vector<KeeperStorage::Delta>
preprocess(KeeperStorage & storage, int64_t zxid, int64_t /*session_id*/, int64_t /*time*/, uint64_t & /*digest*/, const KeeperContext & /*keeper_context*/) const override
{
ProfileEvents::increment(ProfileEvents::KeeperCheckRequest);
Coordination::ZooKeeperCheckRequest & request = dynamic_cast<Coordination::ZooKeeperCheckRequest &>(*zk_request);
if (!storage.uncommitted_state.getNode(request.path))
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
auto node = storage.uncommitted_state.getNode(request.path);
if (request.version != -1 && request.version != node->stat.version)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
if (check_not_exists)
{
if (node && (request.version == -1 || request.version == node->stat.version))
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNODEEXISTS}};
}
else
{
if (!node)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZNONODE}};
if (request.version != -1 && request.version != node->stat.version)
return {KeeperStorage::Delta{zxid, Coordination::Error::ZBADVERSION}};
}
return {};
}
@ -1497,17 +1512,22 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
auto & container = storage.container;
auto node_it = container.find(request.path);
if (node_it == container.end())
if (check_not_exists)
{
on_error(Coordination::Error::ZNONODE);
}
else if (request.version != -1 && request.version != node_it->value.stat.version)
{
on_error(Coordination::Error::ZBADVERSION);
if (node_it != container.end() && (request.version == -1 || request.version == node_it->value.stat.version))
on_error(Coordination::Error::ZNODEEXISTS);
else
response.error = Coordination::Error::ZOK;
}
else
{
response.error = Coordination::Error::ZOK;
if (node_it == container.end())
on_error(Coordination::Error::ZNONODE);
else if (request.version != -1 && request.version != node_it->value.stat.version)
on_error(Coordination::Error::ZBADVERSION);
else
response.error = Coordination::Error::ZOK;
}
return response_ptr;
@ -1523,6 +1543,9 @@ struct KeeperStorageCheckRequestProcessor final : public KeeperStorageRequestPro
ProfileEvents::increment(ProfileEvents::KeeperCheckRequest);
return processImpl<true>(storage, zxid);
}
private:
bool check_not_exists;
};
@ -1716,6 +1739,7 @@ struct KeeperStorageMultiRequestProcessor final : public KeeperStorageRequestPro
concrete_requests.push_back(std::make_shared<KeeperStorageSetRequestProcessor>(sub_zk_request));
break;
case Coordination::OpNum::Check:
case Coordination::OpNum::CheckNotExists:
check_operation_type(OperationType::Write);
concrete_requests.push_back(std::make_shared<KeeperStorageCheckRequestProcessor>(sub_zk_request));
break;
@ -1971,6 +1995,7 @@ KeeperStorageRequestProcessorsFactory::KeeperStorageRequestProcessorsFactory()
registerKeeperRequestProcessor<Coordination::OpNum::MultiRead, KeeperStorageMultiRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::SetACL, KeeperStorageSetACLRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::GetACL, KeeperStorageGetACLRequestProcessor>(*this);
registerKeeperRequestProcessor<Coordination::OpNum::CheckNotExists, KeeperStorageCheckRequestProcessor>(*this);
}

View File

@ -2451,6 +2451,78 @@ TEST_P(CoordinationTest, ChangelogTestMaxLogSize)
}
TEST_P(CoordinationTest, TestCheckNotExistsRequest)
{
using namespace DB;
using namespace Coordination;
KeeperStorage storage{500, "", keeper_context};
int32_t zxid = 0;
const auto create_path = [&](const auto & path)
{
const auto create_request = std::make_shared<ZooKeeperCreateRequest>();
int new_zxid = ++zxid;
create_request->path = path;
storage.preprocessRequest(create_request, 1, 0, new_zxid);
auto responses = storage.processRequest(create_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
EXPECT_EQ(responses[0].response->error, Coordination::Error::ZOK) << "Failed to create " << path;
};
const auto check_request = std::make_shared<ZooKeeperCheckRequest>();
check_request->path = "/test_node";
check_request->not_exists = true;
{
SCOPED_TRACE("CheckNotExists returns ZOK");
int new_zxid = ++zxid;
storage.preprocessRequest(check_request, 1, 0, new_zxid);
auto responses = storage.processRequest(check_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
auto error = responses[0].response->error;
EXPECT_EQ(error, Coordination::Error::ZOK) << "CheckNotExists returned invalid result: " << errorMessage(error);
}
create_path("/test_node");
auto node_it = storage.container.find("/test_node");
ASSERT_NE(node_it, storage.container.end());
auto node_version = node_it->value.stat.version;
{
SCOPED_TRACE("CheckNotExists returns ZNODEEXISTS");
int new_zxid = ++zxid;
storage.preprocessRequest(check_request, 1, 0, new_zxid);
auto responses = storage.processRequest(check_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
auto error = responses[0].response->error;
EXPECT_EQ(error, Coordination::Error::ZNODEEXISTS) << "CheckNotExists returned invalid result: " << errorMessage(error);
}
{
SCOPED_TRACE("CheckNotExists returns ZNODEEXISTS for same version");
int new_zxid = ++zxid;
check_request->version = node_version;
storage.preprocessRequest(check_request, 1, 0, new_zxid);
auto responses = storage.processRequest(check_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
auto error = responses[0].response->error;
EXPECT_EQ(error, Coordination::Error::ZNODEEXISTS) << "CheckNotExists returned invalid result: " << errorMessage(error);
}
{
SCOPED_TRACE("CheckNotExists returns ZOK for different version");
int new_zxid = ++zxid;
check_request->version = node_version + 1;
storage.preprocessRequest(check_request, 1, 0, new_zxid);
auto responses = storage.processRequest(check_request, 1, new_zxid);
EXPECT_GE(responses.size(), 1);
auto error = responses[0].response->error;
EXPECT_EQ(error, Coordination::Error::ZOK) << "CheckNotExists returned invalid result: " << errorMessage(error);
}
}
INSTANTIATE_TEST_SUITE_P(CoordinationTestSuite,
CoordinationTest,

View File

@ -29,11 +29,6 @@
#define DEFAULT_INSERT_BLOCK_SIZE \
1048449 /// 1048576 - PADDING_FOR_SIMD - (PADDING_FOR_SIMD - 1) bytes padding that we usually have in arrays
/** The same, but for merge operations. Less DEFAULT_BLOCK_SIZE for saving RAM (since all the columns are read).
* Significantly less, since there are 10-way mergers.
*/
#define DEFAULT_MERGE_BLOCK_SIZE 8192
#define DEFAULT_PERIODIC_LIVE_VIEW_REFRESH_SEC 60
#define SHOW_CHARS_ON_SYNTAX_ERROR ptrdiff_t(160)
#define DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES 3
@ -83,4 +78,3 @@
#else
#define QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS 0
#endif

View File

@ -493,11 +493,6 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
chassert(file_offset_of_buffer_end > completed_range.right);
if (read_type == ReadType::CACHED)
{
chassert(current_file_segment->getDownloadedSize(true) == current_file_segment->range().size());
}
file_segments->popFront();
if (file_segments->empty())
return false;

View File

@ -1,6 +1,6 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeInterval.h>
@ -25,7 +25,7 @@ class FunctionDateTrunc : public IFunction
public:
static constexpr auto name = "dateTrunc";
explicit FunctionDateTrunc(ContextPtr context_) : context(context_) {}
explicit FunctionDateTrunc(ContextPtr context_) : context(context_) { }
static FunctionPtr create(ContextPtr context) { return std::make_shared<FunctionDateTrunc>(context); }
@ -39,51 +39,58 @@ public:
{
/// The first argument is a constant string with the name of datepart.
auto result_type_is_date = false;
intermediate_type_is_date = false;
String datepart_param;
auto check_first_argument = [&] {
auto check_first_argument = [&]
{
const ColumnConst * datepart_column = checkAndGetColumnConst<ColumnString>(arguments[0].column.get());
if (!datepart_column)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "First argument for function {} must be constant string: "
"name of datepart", getName());
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"First argument for function {} must be constant string: "
"name of datepart",
getName());
datepart_param = datepart_column->getValue<String>();
if (datepart_param.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "First argument (name of datepart) for function {} cannot be empty",
getName());
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "First argument (name of datepart) for function {} cannot be empty", getName());
if (!IntervalKind::tryParseString(datepart_param, datepart_kind))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{} doesn't look like datepart name in {}", datepart_param, getName());
result_type_is_date = (datepart_kind == IntervalKind::Year)
|| (datepart_kind == IntervalKind::Quarter) || (datepart_kind == IntervalKind::Month)
|| (datepart_kind == IntervalKind::Week);
intermediate_type_is_date = (datepart_kind == IntervalKind::Year) || (datepart_kind == IntervalKind::Quarter)
|| (datepart_kind == IntervalKind::Month) || (datepart_kind == IntervalKind::Week);
};
bool second_argument_is_date = false;
auto check_second_argument = [&] {
auto check_second_argument = [&]
{
if (!isDate(arguments[1].type) && !isDateTime(arguments[1].type) && !isDateTime64(arguments[1].type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of 2nd argument of function {}. "
"Should be a date or a date with time", arguments[1].type->getName(), getName());
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of 2nd argument of function {}. "
"Should be a date or a date with time",
arguments[1].type->getName(),
getName());
second_argument_is_date = isDate(arguments[1].type);
if (second_argument_is_date && ((datepart_kind == IntervalKind::Hour)
|| (datepart_kind == IntervalKind::Minute) || (datepart_kind == IntervalKind::Second)))
if (second_argument_is_date
&& ((datepart_kind == IntervalKind::Hour) || (datepart_kind == IntervalKind::Minute)
|| (datepart_kind == IntervalKind::Second)))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type Date of argument for function {}", getName());
};
auto check_timezone_argument = [&] {
auto check_timezone_argument = [&]
{
if (!WhichDataType(arguments[2].type).isString())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}. "
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Illegal type {} of argument of function {}. "
"This argument is optional and must be a constant string with timezone name",
arguments[2].type->getName(), getName());
if (second_argument_is_date && result_type_is_date)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"The timezone argument of function {} with datepart '{}' "
"is allowed only when the 2nd argument has the type DateTime",
getName(), datepart_param);
arguments[2].type->getName(),
getName());
};
if (arguments.size() == 2)
@ -99,15 +106,14 @@ public:
}
else
{
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Number of arguments for function {} doesn't match: passed {}, should be 2 or 3",
getName(), arguments.size());
getName(),
arguments.size());
}
if (result_type_is_date)
return std::make_shared<DataTypeDate>();
else
return std::make_shared<DataTypeDateTime>(extractTimeZoneNameFromFunctionArguments(arguments, 2, 1));
return std::make_shared<DataTypeDateTime>(extractTimeZoneNameFromFunctionArguments(arguments, 2, 1));
}
bool useDefaultImplementationForConstants() const override { return true; }
@ -124,26 +130,40 @@ public:
auto to_start_of_interval = FunctionFactory::instance().get("toStartOfInterval", context);
ColumnPtr truncated_column;
auto date_type = std::make_shared<DataTypeDate>();
if (arguments.size() == 2)
return to_start_of_interval->build(temp_columns)->execute(temp_columns, result_type, input_rows_count);
truncated_column = to_start_of_interval->build(temp_columns)
->execute(temp_columns, intermediate_type_is_date ? date_type : result_type, input_rows_count);
else
{
temp_columns[2] = arguments[2];
truncated_column = to_start_of_interval->build(temp_columns)
->execute(temp_columns, intermediate_type_is_date ? date_type : result_type, input_rows_count);
}
temp_columns[2] = arguments[2];
return to_start_of_interval->build(temp_columns)->execute(temp_columns, result_type, input_rows_count);
if (!intermediate_type_is_date)
return truncated_column;
ColumnsWithTypeAndName temp_truncated_column(1);
temp_truncated_column[0] = {truncated_column, date_type, ""};
auto to_date_time_or_default = FunctionFactory::instance().get("toDateTime", context);
return to_date_time_or_default->build(temp_truncated_column)->execute(temp_truncated_column, result_type, input_rows_count);
}
bool hasInformationAboutMonotonicity() const override
{
return true;
}
bool hasInformationAboutMonotonicity() const override { return true; }
Monotonicity getMonotonicityForRange(const IDataType &, const Field &, const Field &) const override
{
return { .is_monotonic = true, .is_always_monotonic = true };
return {.is_monotonic = true, .is_always_monotonic = true};
}
private:
ContextPtr context;
mutable IntervalKind::Kind datepart_kind = IntervalKind::Kind::Second;
mutable bool intermediate_type_is_date = false;
};
}

View File

@ -33,7 +33,7 @@ String getExceptionEntryWithFileName(const ReadBuffer & in)
if (filename.empty())
return "";
return fmt::format("; While reading from: {}", filename);
return fmt::format(": While reading from: {}", filename);
}
}

View File

@ -278,8 +278,8 @@ void CacheMetadata::doCleanup()
}
catch (...)
{
chassert(false);
tryLogCurrentException(__PRETTY_FUNCTION__);
chassert(false);
}
}
}

View File

@ -165,6 +165,7 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
pipeline.getNumStreams(),
sort_description,
rows_in_block,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
@ -220,6 +221,7 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
pipeline.getNumStreams(),
sort_description,
rows_in_block,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
@ -254,6 +256,7 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
pipeline.getNumStreams(),
sort_description,
rows_in_block,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Default);
pipeline.addTransform(std::move(transform));
@ -331,6 +334,7 @@ Block SortedBlocksBuffer::mergeBlocks(Blocks && blocks) const
builder.getNumStreams(),
sort_description,
num_rows,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Default);
builder.addTransform(std::move(transform));

View File

@ -147,6 +147,7 @@ void TableJoin::addDisjunct()
void TableJoin::addOnKeys(ASTPtr & left_table_ast, ASTPtr & right_table_ast)
{
addKey(left_table_ast->getColumnName(), right_table_ast->getAliasOrColumnName(), left_table_ast, right_table_ast);
right_key_aliases[right_table_ast->getColumnName()] = right_table_ast->getAliasOrColumnName();
}
/// @return how many times right key appears in ON section.
@ -662,6 +663,14 @@ String TableJoin::renamedRightColumnName(const String & name) const
return name;
}
String TableJoin::renamedRightColumnNameWithAlias(const String & name) const
{
auto renamed = renamedRightColumnName(name);
if (const auto it = right_key_aliases.find(renamed); it != right_key_aliases.end())
return it->second;
return renamed;
}
void TableJoin::setRename(const String & from, const String & to)
{
renames[from] = to;

View File

@ -156,6 +156,13 @@ private:
/// Original name -> name. Only renamed columns.
std::unordered_map<String, String> renames;
/// Map column name to actual key name that can be an alias.
/// Example: SELECT r.id as rid from t JOIN r ON t.id = rid
/// Map: r.id -> rid
/// Required only for StorageJoin to map join keys back to original column names.
/// (workaround for ExpressionAnalyzer)
std::unordered_map<String, String> right_key_aliases;
VolumePtr tmp_volume;
std::shared_ptr<StorageJoin> right_storage_join;
@ -333,6 +340,7 @@ public:
Block getRequiredRightKeys(const Block & right_table_keys, std::vector<String> & keys_sources) const;
String renamedRightColumnName(const String & name) const;
String renamedRightColumnNameWithAlias(const String & name) const;
void setRename(const String & from, const String & to);
void resetKeys();

View File

@ -87,6 +87,7 @@ NamesAndTypesList ZooKeeperLogElement::getNamesAndTypes()
{"Auth", static_cast<Int16>(Coordination::OpNum::Auth)},
{"SessionID", static_cast<Int16>(Coordination::OpNum::SessionID)},
{"FilteredList", static_cast<Int16>(Coordination::OpNum::FilteredList)},
{"CheckNotExists", static_cast<Int16>(Coordination::OpNum::CheckNotExists)},
});
auto error_enum = getCoordinationErrorCodesEnumType();

View File

@ -1429,10 +1429,12 @@ bool ParserAlias::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!allow_alias_without_as_keyword && !has_as_word)
return false;
bool is_quoted = pos->type == TokenType::QuotedIdentifier;
if (!id_p.parse(pos, node, expected))
return false;
if (!has_as_word)
if (!has_as_word && !is_quoted)
{
/** In this case, the alias can not match the keyword -
* so that in the query "SELECT x FROM t", the word FROM was not considered an alias,

View File

@ -13,14 +13,18 @@ class AggregatingSortedTransform final : public IMergingTransform<AggregatingSor
{
public:
AggregatingSortedTransform(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size)
const Block & header,
size_t num_inputs,
SortDescription description_,
size_t max_block_size_rows,
size_t max_block_size_bytes)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),
max_block_size)
max_block_size_rows,
max_block_size_bytes)
{
}

View File

@ -159,8 +159,11 @@ AggregatingSortedAlgorithm::SimpleAggregateDescription::~SimpleAggregateDescript
AggregatingSortedAlgorithm::AggregatingMergedData::AggregatingMergedData(
MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_)
: MergedData(std::move(columns_), false, max_block_size_), def(def_)
MutableColumns columns_,
UInt64 max_block_size_rows_,
UInt64 max_block_size_bytes_,
ColumnsDefinition & def_)
: MergedData(std::move(columns_), false, max_block_size_rows_, max_block_size_bytes_), def(def_)
{
initAggregateDescription();
@ -257,10 +260,14 @@ void AggregatingSortedAlgorithm::AggregatingMergedData::initAggregateDescription
AggregatingSortedAlgorithm::AggregatingSortedAlgorithm(
const Block & header_, size_t num_inputs, SortDescription description_, size_t max_block_size)
const Block & header_,
size_t num_inputs,
SortDescription description_,
size_t max_block_size_rows_,
size_t max_block_size_bytes_)
: IMergingAlgorithmWithDelayedChunk(header_, num_inputs, description_)
, columns_definition(defineColumns(header_, description_))
, merged_data(getMergedColumns(header_, columns_definition), max_block_size, columns_definition)
, merged_data(getMergedColumns(header_, columns_definition), max_block_size_rows_, max_block_size_bytes_, columns_definition)
{
}

View File

@ -18,8 +18,11 @@ class AggregatingSortedAlgorithm final : public IMergingAlgorithmWithDelayedChun
{
public:
AggregatingSortedAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size);
const Block & header,
size_t num_inputs,
SortDescription description_,
size_t max_block_size_rows_,
size_t max_block_size_bytes_);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
@ -96,7 +99,11 @@ private:
using MergedData::insertRow;
public:
AggregatingMergedData(MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_);
AggregatingMergedData(
MutableColumns columns_,
UInt64 max_block_size_rows_,
UInt64 max_block_size_bytes_,
ColumnsDefinition & def_);
/// Group is a group of rows with the same sorting key. It represents single row in result.
/// Algorithm is: start group, add several rows, finish group.

View File

@ -26,12 +26,13 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
SortDescription description_,
const String & sign_column,
bool only_positive_sign_,
size_t max_block_size,
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
Poco::Logger * log_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_)
, sign_column_number(header_.getPositionByName(sign_column))
, only_positive_sign(only_positive_sign_)
, log(log_)

View File

@ -32,7 +32,8 @@ public:
SortDescription description_,
const String & sign_column,
bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0.
size_t max_block_size,
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
Poco::Logger * log_,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
@ -74,4 +75,3 @@ private:
};
}

View File

@ -30,9 +30,9 @@ FinishAggregatingInOrderAlgorithm::FinishAggregatingInOrderAlgorithm(
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
const SortDescription & description_,
size_t max_block_size_,
size_t max_block_bytes_)
: header(header_), num_inputs(num_inputs_), params(params_), max_block_size(max_block_size_), max_block_bytes(max_block_bytes_)
size_t max_block_size_rows_,
size_t max_block_size_bytes_)
: header(header_), num_inputs(num_inputs_), params(params_), max_block_size_rows(max_block_size_rows_), max_block_size_bytes(max_block_size_bytes_)
{
for (const auto & column_description : description_)
description.emplace_back(column_description, header_.getPositionByName(column_description.column_name));
@ -118,7 +118,7 @@ IMergingAlgorithm::Status FinishAggregatingInOrderAlgorithm::merge()
inputs_to_update.pop_back();
/// Do not merge blocks, if there are too few rows or bytes.
if (accumulated_rows >= max_block_size || accumulated_bytes >= max_block_bytes)
if (accumulated_rows >= max_block_size_rows || accumulated_bytes >= max_block_size_bytes)
status.chunk = prepareToMerge();
return status;

View File

@ -42,8 +42,8 @@ public:
size_t num_inputs_,
AggregatingTransformParamsPtr params_,
const SortDescription & description_,
size_t max_block_size_,
size_t max_block_bytes_);
size_t max_block_size_rows_,
size_t max_block_size_bytes_);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
@ -79,8 +79,8 @@ private:
size_t num_inputs;
AggregatingTransformParamsPtr params;
SortDescriptionWithPositions description;
size_t max_block_size;
size_t max_block_bytes;
size_t max_block_size_rows;
size_t max_block_size_bytes;
Inputs current_inputs;

View File

@ -42,11 +42,12 @@ GraphiteRollupSortedAlgorithm::GraphiteRollupSortedAlgorithm(
const Block & header_,
size_t num_inputs,
SortDescription description_,
size_t max_block_size,
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
Graphite::Params params_,
time_t time_of_merge_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), nullptr, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), false, max_block_size)
, merged_data(header_.cloneEmptyColumns(), false, max_block_size_rows_, max_block_size_bytes_)
, params(std::move(params_))
, time_of_merge(time_of_merge_)
{

View File

@ -22,9 +22,13 @@ class GraphiteRollupSortedAlgorithm final : public IMergingAlgorithmWithSharedCh
{
public:
GraphiteRollupSortedAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_);
const Block & header,
size_t num_inputs,
SortDescription description_,
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
Graphite::Params params_,
time_t time_of_merge_);
Status merge() override;

View File

@ -19,8 +19,8 @@ namespace ErrorCodes
class MergedData
{
public:
explicit MergedData(MutableColumns columns_, bool use_average_block_size_, UInt64 max_block_size_)
: columns(std::move(columns_)), max_block_size(max_block_size_), use_average_block_size(use_average_block_size_)
explicit MergedData(MutableColumns columns_, bool use_average_block_size_, UInt64 max_block_size_, UInt64 max_block_size_bytes_)
: columns(std::move(columns_)), max_block_size(max_block_size_), max_block_size_bytes(max_block_size_bytes_), use_average_block_size(use_average_block_size_)
{
}
@ -117,6 +117,16 @@ public:
if (merged_rows >= max_block_size)
return true;
/// Never return more than max_block_size_bytes
if (max_block_size_bytes)
{
size_t merged_bytes = 0;
for (const auto & column : columns)
merged_bytes += column->allocatedBytes();
if (merged_bytes >= max_block_size_bytes)
return true;
}
if (!use_average_block_size)
return false;
@ -143,8 +153,9 @@ protected:
UInt64 total_chunks = 0;
UInt64 total_allocated_bytes = 0;
const UInt64 max_block_size;
const bool use_average_block_size;
const UInt64 max_block_size = 0;
const UInt64 max_block_size_bytes = 0;
const bool use_average_block_size = false;
bool need_flush = false;
};

View File

@ -11,13 +11,14 @@ MergingSortedAlgorithm::MergingSortedAlgorithm(
Block header_,
size_t num_inputs,
const SortDescription & description_,
size_t max_block_size,
size_t max_block_size_,
size_t max_block_size_bytes_,
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: header(std::move(header_))
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size_, max_block_size_bytes_)
, description(description_)
, limit(limit_)
, out_row_sources_buf(out_row_sources_buf_)

View File

@ -17,7 +17,8 @@ public:
Block header_,
size_t num_inputs,
const SortDescription & description_,
size_t max_block_size,
size_t max_block_size_,
size_t max_block_size_bytes_,
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_ = 0,
WriteBuffer * out_row_sources_buf_ = nullptr,

View File

@ -17,12 +17,13 @@ ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
SortDescription description_,
const String & is_deleted_column,
const String & version_column,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
bool cleanup_)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size), cleanup(cleanup_)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows, max_block_size_bytes), cleanup(cleanup_)
{
if (!is_deleted_column.empty())
is_deleted_column_number = header_.getPositionByName(is_deleted_column);

View File

@ -23,7 +23,8 @@ public:
SortDescription description_,
const String & is_deleted_column,
const String & version_column,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false);

View File

@ -497,8 +497,8 @@ static void setRow(Row & row, const ColumnRawPtrs & raw_columns, size_t row_num,
SummingSortedAlgorithm::SummingMergedData::SummingMergedData(
MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_)
: MergedData(std::move(columns_), false, max_block_size_)
MutableColumns columns_, UInt64 max_block_size_rows_, UInt64 max_block_size_bytes_, ColumnsDefinition & def_)
: MergedData(std::move(columns_), false, max_block_size_rows_, max_block_size_bytes_)
, def(def_)
{
current_row.resize(def.column_names.size());
@ -686,10 +686,11 @@ SummingSortedAlgorithm::SummingSortedAlgorithm(
SortDescription description_,
const Names & column_names_to_sum,
const Names & partition_key_columns,
size_t max_block_size)
size_t max_block_size_rows,
size_t max_block_size_bytes)
: IMergingAlgorithmWithDelayedChunk(header_, num_inputs, std::move(description_))
, columns_definition(defineColumns(header_, description, column_names_to_sum, partition_key_columns))
, merged_data(getMergedDataColumns(header_, columns_definition), max_block_size, columns_definition)
, merged_data(getMergedDataColumns(header_, columns_definition), max_block_size_rows, max_block_size_bytes, columns_definition)
{
}

View File

@ -22,7 +22,8 @@ public:
const Names & column_names_to_sum,
/// List of partition key columns. They have to be excluded.
const Names & partition_key_columns,
size_t max_block_size);
size_t max_block_size_rows,
size_t max_block_size_bytes);
void initialize(Inputs inputs) override;
void consume(Input & input, size_t source_num) override;
@ -63,7 +64,7 @@ public:
using MergedData::insertRow;
public:
SummingMergedData(MutableColumns columns_, UInt64 max_block_size_, ColumnsDefinition & def_);
SummingMergedData(MutableColumns columns_, UInt64 max_block_size_rows, UInt64 max_block_size_bytes_, ColumnsDefinition & def_);
void startGroup(ColumnRawPtrs & raw_columns, size_t row);
void finishGroup();

View File

@ -12,13 +12,14 @@ VersionedCollapsingAlgorithm::VersionedCollapsingAlgorithm(
size_t num_inputs,
SortDescription description_,
const String & sign_column_,
size_t max_block_size,
size_t max_block_size_rows_,
size_t max_block_size_bytes_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
: IMergingAlgorithmWithSharedChunks(header_, num_inputs, std::move(description_), out_row_sources_buf_, MAX_ROWS_IN_MULTIVERSION_QUEUE)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, merged_data(header_.cloneEmptyColumns(), use_average_block_sizes, max_block_size_rows_, max_block_size_bytes_)
/// -1 for +1 in FixedSizeDequeWithGaps's internal buffer. 3 is a reasonable minimum size to collapse anything.
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 1)
, max_rows_in_queue(std::min(std::max<size_t>(3, max_block_size_rows_), MAX_ROWS_IN_MULTIVERSION_QUEUE) - 1)
, current_keys(max_rows_in_queue)
{
sign_column_number = header_.getPositionByName(sign_column_);

View File

@ -20,7 +20,8 @@ public:
VersionedCollapsingAlgorithm(
const Block & header, size_t num_inputs,
SortDescription description_, const String & sign_column_,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);

View File

@ -16,7 +16,8 @@ public:
SortDescription description_,
const String & sign_column,
bool only_positive_sign,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
@ -26,7 +27,8 @@ public:
std::move(description_),
sign_column,
only_positive_sign,
max_block_size,
max_block_size_rows,
max_block_size_bytes,
&Poco::Logger::get("CollapsingSortedTransform"),
out_row_sources_buf_,
use_average_block_sizes)

View File

@ -17,16 +17,16 @@ public:
size_t num_inputs,
AggregatingTransformParamsPtr params,
SortDescription description,
size_t max_block_size,
size_t max_block_bytes)
size_t max_block_size_rows,
size_t max_block_size_bytes)
: IMergingTransform(
num_inputs, header, {}, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
params,
std::move(description),
max_block_size,
max_block_bytes)
max_block_size_rows,
max_block_size_bytes)
{
}

View File

@ -11,15 +11,20 @@ class GraphiteRollupSortedTransform final : public IMergingTransform<GraphiteRol
{
public:
GraphiteRollupSortedTransform(
const Block & header, size_t num_inputs,
SortDescription description_, size_t max_block_size,
Graphite::Params params_, time_t time_of_merge_)
const Block & header,
size_t num_inputs,
SortDescription description_,
size_t max_block_size_rows,
size_t max_block_size_bytes,
Graphite::Params params_,
time_t time_of_merge_)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
num_inputs,
std::move(description_),
max_block_size,
max_block_size_rows,
max_block_size_bytes,
std::move(params_),
time_of_merge_)
{

View File

@ -11,7 +11,8 @@ MergingSortedTransform::MergingSortedTransform(
const Block & header,
size_t num_inputs,
const SortDescription & description_,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_,
bool always_read_till_end_,
@ -29,7 +30,8 @@ MergingSortedTransform::MergingSortedTransform(
header,
num_inputs,
description_,
max_block_size,
max_block_size_rows,
max_block_size_bytes,
sorting_queue_strategy,
limit_,
out_row_sources_buf_,

View File

@ -15,7 +15,8 @@ public:
const Block & header,
size_t num_inputs,
const SortDescription & description,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
SortingQueueStrategy sorting_queue_strategy,
UInt64 limit_ = 0,
bool always_read_till_end_ = false,

View File

@ -15,7 +15,8 @@ public:
const Block & header, size_t num_inputs,
SortDescription description_,
const String & is_deleted_column, const String & version_column,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool cleanup = false)
@ -26,7 +27,8 @@ public:
std::move(description_),
is_deleted_column,
version_column,
max_block_size,
max_block_size_rows,
max_block_size_bytes,
out_row_sources_buf_,
use_average_block_sizes,
cleanup)

View File

@ -17,7 +17,9 @@ public:
/// List of columns to be summed. If empty, all numeric columns that are not in the description are taken.
const Names & column_names_to_sum,
const Names & partition_key_columns,
size_t max_block_size)
size_t max_block_size_rows,
size_t max_block_size_bytes
)
: IMergingTransform(
num_inputs, header, header, /*have_all_inputs_=*/ true, /*limit_hint_=*/ 0, /*always_read_till_end_=*/ false,
header,
@ -25,7 +27,8 @@ public:
std::move(description_),
column_names_to_sum,
partition_key_columns,
max_block_size)
max_block_size_rows,
max_block_size_bytes)
{
}

View File

@ -15,7 +15,8 @@ public:
VersionedCollapsingTransform(
const Block & header, size_t num_inputs,
SortDescription description_, const String & sign_column_,
size_t max_block_size,
size_t max_block_size_rows,
size_t max_block_size_bytes,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false)
: IMergingTransform(
@ -24,7 +25,8 @@ public:
num_inputs,
std::move(description_),
sign_column_,
max_block_size,
max_block_size_rows,
max_block_size_bytes,
out_row_sources_buf_,
use_average_block_sizes)
{

View File

@ -852,7 +852,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (pipe.numOutputPorts() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size, SortingQueueStrategy::Batch);
pipe.getHeader(), pipe.numOutputPorts(), sort_description, max_block_size, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch);
pipe.addTransform(std::move(transform));
}
@ -898,31 +898,31 @@ static void addMergingFinal(
{
case MergeTreeData::MergingParams::Ordinary:
return std::make_shared<MergingSortedTransform>(header, num_outputs,
sort_description, max_block_size, SortingQueueStrategy::Batch);
sort_description, max_block_size, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedTransform>(header, num_outputs,
sort_description, merging_params.sign_column, true, max_block_size);
sort_description, merging_params.sign_column, true, max_block_size, /*max_block_size_bytes=*/0);
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedTransform>(header, num_outputs,
sort_description, merging_params.columns_to_sum, partition_key_columns, max_block_size);
sort_description, merging_params.columns_to_sum, partition_key_columns, max_block_size, /*max_block_size_bytes=*/0);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedTransform>(header, num_outputs,
sort_description, max_block_size);
sort_description, max_block_size, /*max_block_size_bytes=*/0);
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedTransform>(header, num_outputs,
sort_description, merging_params.is_deleted_column, merging_params.version_column, max_block_size, /*out_row_sources_buf_*/ nullptr, /*use_average_block_sizes*/ false, /*cleanup*/ !merging_params.is_deleted_column.empty());
sort_description, merging_params.is_deleted_column, merging_params.version_column, max_block_size, /*max_block_size_bytes=*/0, /*out_row_sources_buf_*/ nullptr, /*use_average_block_sizes*/ false, /*cleanup*/ !merging_params.is_deleted_column.empty());
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingTransform>(header, num_outputs,
sort_description, merging_params.sign_column, max_block_size);
sort_description, merging_params.sign_column, max_block_size, /*max_block_size_bytes=*/0);
case MergeTreeData::MergingParams::Graphite:
return std::make_shared<GraphiteRollupSortedTransform>(header, num_outputs,
sort_description, max_block_size, merging_params.graphite_params, now);
sort_description, max_block_size, /*max_block_size_bytes=*/0, merging_params.graphite_params, now);
}
UNREACHABLE();

View File

@ -176,6 +176,7 @@ void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescr
pipeline.getNumStreams(),
result_sort_desc,
sort_settings.max_block_size,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Batch,
limit_,
always_read_till_end);
@ -269,6 +270,7 @@ void SortingStep::fullSort(
pipeline.getNumStreams(),
result_sort_desc,
sort_settings.max_block_size,
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Batch,
limit_,
always_read_till_end);

View File

@ -186,6 +186,7 @@ void MergeSortingTransform::consume(Chunk chunk)
0,
description,
max_merged_block_size,
/*max_merged_block_size_bytes*/0,
SortingQueueStrategy::Batch,
limit,
/*always_read_till_end_=*/ false,

View File

@ -83,7 +83,7 @@ TEST(MergingSortedTest, SimpleBlockSizeTest)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform));
@ -125,7 +125,7 @@ TEST(MergingSortedTest, MoreInterestingBlockSizes)
EXPECT_EQ(pipe.numOutputPorts(), 3);
auto transform = std::make_shared<MergingSortedTransform>(pipe.getHeader(), pipe.numOutputPorts(), sort_description,
DEFAULT_MERGE_BLOCK_SIZE, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
8192, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch, 0, false, nullptr, false, true);
pipe.addTransform(std::move(transform));

View File

@ -1090,7 +1090,11 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
"in a single ALTER query", backQuote(column_name));
if (command.codec)
{
if (all_columns.hasAlias(column_name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cannot specify codec for column type ALIAS");
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs);
}
auto column_default = all_columns.getDefault(column_name);
if (column_default)
{

View File

@ -659,6 +659,12 @@ bool ColumnsDescription::hasPhysical(const String & column_name) const
it->default_desc.kind != ColumnDefaultKind::Alias && it->default_desc.kind != ColumnDefaultKind::Ephemeral;
}
bool ColumnsDescription::hasAlias(const String & column_name) const
{
auto it = columns.get<1>().find(column_name);
return it != columns.get<1>().end() && it->default_desc.kind == ColumnDefaultKind::Alias;
}
bool ColumnsDescription::hasColumnOrSubcolumn(GetColumnsOptions::Kind kind, const String & column_name) const
{
auto it = columns.get<1>().find(column_name);

View File

@ -177,6 +177,7 @@ public:
Names getNamesOfPhysical() const;
bool hasPhysical(const String & column_name) const;
bool hasAlias(const String & column_name) const;
bool hasColumnOrSubcolumn(GetColumnsOptions::Kind kind, const String & column_name) const;
bool hasColumnOrNested(GetColumnsOptions::Kind kind, const String & column_name) const;

View File

@ -463,6 +463,7 @@ void DataPartStorageOnDiskBase::rename(
if (volume->getDisk()->exists(to))
{
/// FIXME it should be logical error
if (remove_new_dir_if_exists)
{
Names files;
@ -473,7 +474,8 @@ void DataPartStorageOnDiskBase::rename(
"Part directory {} already exists and contains {} files. Removing it.",
fullPath(volume->getDisk(), to), files.size());
executeWriteOperation([&](auto & disk) { disk.removeRecursive(to); });
/// Do not remove blobs if they exist
executeWriteOperation([&](auto & disk) { disk.removeSharedRecursive(to, true, {}); });
}
else
{

View File

@ -24,7 +24,7 @@ template <typename T>
std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
const String & path_prefix_, const String & temp_path, const ZooKeeperWithFaultInjectionPtr & zookeeper_, const T & deduplication_path)
{
constexpr bool async_insert = std::is_same_v<T, std::vector<String>>;
static constexpr bool async_insert = std::is_same_v<T, std::vector<String>>;
String path;
@ -42,16 +42,15 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
if constexpr (async_insert)
{
for (const auto & single_dedup_path : deduplication_path)
{
ops.emplace_back(zkutil::makeCreateRequest(single_dedup_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(single_dedup_path, -1));
}
zkutil::addCheckNotExistsRequest(ops, *zookeeper_, single_dedup_path);
}
else
{
ops.emplace_back(zkutil::makeCreateRequest(deduplication_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(deduplication_path, -1));
zkutil::addCheckNotExistsRequest(ops, *zookeeper_, deduplication_path);
}
auto deduplication_path_ops_size = ops.size();
ops.emplace_back(zkutil::makeCreateRequest(path_prefix_, holder_path, zkutil::CreateMode::EphemeralSequential));
Coordination::Responses responses;
Coordination::Error e = zookeeper_->tryMulti(ops, responses);
@ -60,9 +59,10 @@ std::optional<EphemeralLockInZooKeeper> createEphemeralLockInZooKeeper(
if constexpr (async_insert)
{
auto failed_idx = zkutil::getFailedOpIndex(Coordination::Error::ZNODEEXISTS, responses);
if (failed_idx < deduplication_path.size() * 2)
if (failed_idx < deduplication_path_ops_size)
{
const String & failed_op_path = deduplication_path[failed_idx / 2];
const String & failed_op_path = ops[failed_idx]->getPath();
LOG_DEBUG(
&Poco::Logger::get("createEphemeralLockInZooKeeper"),
"Deduplication path already exists: deduplication_path={}",

View File

@ -31,6 +31,7 @@ MergeListElement::MergeListElement(
source_part_paths.emplace_back(source_part->getDataPartStorage().getFullPath());
total_size_bytes_compressed += source_part->getBytesOnDisk();
total_size_bytes_uncompressed += source_part->getTotalColumnsSize().data_uncompressed;
total_size_marks += source_part->getMarksCount();
total_rows_count += source_part->index_granularity.getTotalRows();
}
@ -57,6 +58,7 @@ MergeInfo MergeListElement::getInfo() const
res.progress = progress.load(std::memory_order_relaxed);
res.num_parts = num_parts;
res.total_size_bytes_compressed = total_size_bytes_compressed;
res.total_size_bytes_uncompressed = total_size_bytes_uncompressed;
res.total_size_marks = total_size_marks;
res.total_rows_count = total_rows_count;
res.bytes_read_uncompressed = bytes_read_uncompressed.load(std::memory_order_relaxed);

View File

@ -40,6 +40,7 @@ struct MergeInfo
Float64 progress;
UInt64 num_parts;
UInt64 total_size_bytes_compressed;
UInt64 total_size_bytes_uncompressed;
UInt64 total_size_marks;
UInt64 total_rows_count;
UInt64 bytes_read_uncompressed;
@ -82,6 +83,7 @@ struct MergeListElement : boost::noncopyable
std::atomic<bool> is_cancelled{};
UInt64 total_size_bytes_compressed{};
UInt64 total_size_bytes_uncompressed{};
UInt64 total_size_marks{};
UInt64 total_rows_count{};
std::atomic<UInt64> bytes_read_uncompressed{};

View File

@ -921,7 +921,9 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
/// If merge is vertical we cannot calculate it
ctx->blocks_are_granules_size = (global_ctx->chosen_merge_algorithm == MergeAlgorithm::Vertical);
UInt64 merge_block_size = data_settings->merge_max_block_size;
/// There is no sense to have the block size bigger than one granule for merge operations.
const UInt64 merge_block_size_rows = data_settings->merge_max_block_size;
const UInt64 merge_block_size_bytes = data_settings->merge_max_block_size_bytes;
switch (ctx->merging_params.mode)
{
@ -930,7 +932,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
header,
pipes.size(),
sort_description,
merge_block_size,
merge_block_size_rows,
merge_block_size_bytes,
SortingQueueStrategy::Default,
/* limit_= */0,
/* always_read_till_end_= */false,
@ -942,35 +945,35 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
case MergeTreeData::MergingParams::Collapsing:
merged_transform = std::make_shared<CollapsingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.sign_column, false,
merge_block_size, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
break;
case MergeTreeData::MergingParams::Summing:
merged_transform = std::make_shared<SummingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.columns_to_sum, partition_key_columns, merge_block_size);
header, pipes.size(), sort_description, ctx->merging_params.columns_to_sum, partition_key_columns, merge_block_size_rows, merge_block_size_bytes);
break;
case MergeTreeData::MergingParams::Aggregating:
merged_transform = std::make_shared<AggregatingSortedTransform>(header, pipes.size(), sort_description, merge_block_size);
merged_transform = std::make_shared<AggregatingSortedTransform>(header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes);
break;
case MergeTreeData::MergingParams::Replacing:
merged_transform = std::make_shared<ReplacingSortedTransform>(
header, pipes.size(), sort_description, ctx->merging_params.is_deleted_column, ctx->merging_params.version_column,
merge_block_size, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size,
(data_settings->clean_deleted_rows != CleanDeletedRows::Never) || global_ctx->cleanup);
break;
case MergeTreeData::MergingParams::Graphite:
merged_transform = std::make_shared<GraphiteRollupSortedTransform>(
header, pipes.size(), sort_description, merge_block_size,
header, pipes.size(), sort_description, merge_block_size_rows, merge_block_size_bytes,
ctx->merging_params.graphite_params, global_ctx->time_of_merge);
break;
case MergeTreeData::MergingParams::VersionedCollapsing:
merged_transform = std::make_shared<VersionedCollapsingTransform>(
header, pipes.size(), sort_description, ctx->merging_params.sign_column,
merge_block_size, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
merge_block_size_rows, merge_block_size_bytes, ctx->rows_sources_write_buf.get(), ctx->blocks_are_granules_size);
break;
}
@ -1011,7 +1014,8 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm() const
{
const size_t sum_rows_upper_bound = global_ctx->merge_list_element_ptr->total_rows_count;
const size_t total_rows_count = global_ctx->merge_list_element_ptr->total_rows_count;
const size_t total_size_bytes_uncompressed = global_ctx->merge_list_element_ptr->total_size_bytes_uncompressed;
const auto data_settings = global_ctx->data->getSettings();
if (global_ctx->deduplicate)
@ -1042,11 +1046,13 @@ MergeAlgorithm MergeTask::ExecuteAndFinalizeHorizontalPart::chooseMergeAlgorithm
bool enough_ordinary_cols = global_ctx->gathering_columns.size() >= data_settings->vertical_merge_algorithm_min_columns_to_activate;
bool enough_total_rows = sum_rows_upper_bound >= data_settings->vertical_merge_algorithm_min_rows_to_activate;
bool enough_total_rows = total_rows_count >= data_settings->vertical_merge_algorithm_min_rows_to_activate;
bool enough_total_bytes = total_size_bytes_uncompressed >= data_settings->vertical_merge_algorithm_min_bytes_to_activate;
bool no_parts_overflow = global_ctx->future_part->parts.size() <= RowSourcePart::MAX_PARTS;
auto merge_alg = (is_supported_storage && enough_total_rows && enough_ordinary_cols && no_parts_overflow) ?
auto merge_alg = (is_supported_storage && enough_total_rows && enough_total_bytes && enough_ordinary_cols && no_parts_overflow) ?
MergeAlgorithm::Vertical : MergeAlgorithm::Horizontal;
return merge_alg;

View File

@ -280,23 +280,23 @@ Block MergeTreeDataWriter::mergeBlock(
return nullptr;
case MergeTreeData::MergingParams::Replacing:
return std::make_shared<ReplacingSortedAlgorithm>(
block, 1, sort_description, merging_params.is_deleted_column, merging_params.version_column, block_size + 1);
block, 1, sort_description, merging_params.is_deleted_column, merging_params.version_column, block_size + 1, /*block_size_bytes=*/0);
case MergeTreeData::MergingParams::Collapsing:
return std::make_shared<CollapsingSortedAlgorithm>(
block, 1, sort_description, merging_params.sign_column,
false, block_size + 1, &Poco::Logger::get("MergeTreeDataWriter"));
false, block_size + 1, /*block_size_bytes=*/0, &Poco::Logger::get("MergeTreeDataWriter"));
case MergeTreeData::MergingParams::Summing:
return std::make_shared<SummingSortedAlgorithm>(
block, 1, sort_description, merging_params.columns_to_sum,
partition_key_columns, block_size + 1);
partition_key_columns, block_size + 1, /*block_size_bytes=*/0);
case MergeTreeData::MergingParams::Aggregating:
return std::make_shared<AggregatingSortedAlgorithm>(block, 1, sort_description, block_size + 1);
return std::make_shared<AggregatingSortedAlgorithm>(block, 1, sort_description, block_size + 1, /*block_size_bytes=*/0);
case MergeTreeData::MergingParams::VersionedCollapsing:
return std::make_shared<VersionedCollapsingAlgorithm>(
block, 1, sort_description, merging_params.sign_column, block_size + 1);
block, 1, sort_description, merging_params.sign_column, block_size + 1, /*block_size_bytes=*/0);
case MergeTreeData::MergingParams::Graphite:
return std::make_shared<GraphiteRollupSortedAlgorithm>(
block, 1, sort_description, block_size + 1, merging_params.graphite_params, time(nullptr));
block, 1, sort_description, block_size + 1, /*block_size_bytes=*/0, merging_params.graphite_params, time(nullptr));
}
UNREACHABLE();

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergeTreeIndexReader.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
namespace
{
@ -20,7 +21,7 @@ std::unique_ptr<MergeTreeReaderStream> makeIndexReader(
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
return std::make_unique<MergeTreeReaderStream>(
part->getDataPartStoragePtr(),
std::make_shared<LoadedMergeTreeDataPartInfoForReader>(part),
index->getFileName(), extension, marks_count,
all_mark_ranges,
std::move(settings), mark_cache, uncompressed_cache,

View File

@ -30,7 +30,7 @@ namespace ErrorCodes
}
MergeTreeMarksLoader::MergeTreeMarksLoader(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
MarkCache * mark_cache_,
const String & mrk_path_,
size_t marks_count_,
@ -39,7 +39,7 @@ MergeTreeMarksLoader::MergeTreeMarksLoader(
const ReadSettings & read_settings_,
ThreadPool * load_marks_threadpool_,
size_t columns_in_mark_)
: data_part_storage(std::move(data_part_storage_))
: data_part_reader(data_part_reader_)
, mark_cache(mark_cache_)
, mrk_path(mrk_path_)
, marks_count(marks_count_)
@ -98,6 +98,8 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarksImpl()
/// Memory for marks must not be accounted as memory usage for query, because they are stored in shared cache.
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;
auto data_part_storage = data_part_reader->getDataPartStorage();
size_t file_size = data_part_storage->getFileSize(mrk_path);
size_t mark_size = index_granularity_info.getMarkSizeInBytes(columns_in_mark);
size_t expected_uncompressed_size = mark_size * marks_count;
@ -177,6 +179,8 @@ MarkCache::MappedPtr MergeTreeMarksLoader::loadMarks()
{
MarkCache::MappedPtr loaded_marks;
auto data_part_storage = data_part_reader->getDataPartStorage();
if (mark_cache)
{
auto key = mark_cache->hash(fs::path(data_part_storage->getFullPath()) / mrk_path);

View File

@ -1,9 +1,9 @@
#pragma once
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/MarkCache.h>
#include <IO/ReadSettings.h>
#include <Common/ThreadPool_fwd.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
namespace DB
@ -18,7 +18,7 @@ public:
using MarksPtr = MarkCache::MappedPtr;
MergeTreeMarksLoader(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
MarkCache * mark_cache_,
const String & mrk_path,
size_t marks_count_,
@ -33,7 +33,7 @@ public:
MarkInCompressedFile getMark(size_t row_index, size_t column_index = 0);
private:
DataPartStoragePtr data_part_storage;
MergeTreeDataPartInfoForReaderPtr data_part_reader;
MarkCache * mark_cache = nullptr;
String mrk_path;
size_t marks_count;

View File

@ -36,7 +36,7 @@ MergeTreeReaderCompact::MergeTreeReaderCompact(
settings_,
avg_value_size_hints_)
, marks_loader(
data_part_info_for_read_->getDataPartStorage(),
data_part_info_for_read_,
mark_cache,
data_part_info_for_read_->getIndexGranularityInfo().getMarksFilePath(MergeTreeDataPartCompact::DATA_FILE_NAME),
data_part_info_for_read_->getMarksCount(),

View File

@ -15,7 +15,7 @@ namespace ErrorCodes
}
MergeTreeReaderStream::MergeTreeReaderStream(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
const String & path_prefix_,
const String & data_file_extension_,
size_t marks_count_,
@ -35,7 +35,7 @@ MergeTreeReaderStream::MergeTreeReaderStream(
, all_mark_ranges(all_mark_ranges_)
, file_size(file_size_)
, uncompressed_cache(uncompressed_cache_)
, data_part_storage(std::move(data_part_storage_))
, data_part_storage(data_part_reader_->getDataPartStorage())
, path_prefix(path_prefix_)
, data_file_extension(data_file_extension_)
, is_low_cardinality_dictionary(is_low_cardinality_dictionary_)
@ -44,7 +44,7 @@ MergeTreeReaderStream::MergeTreeReaderStream(
, save_marks_in_cache(settings.save_marks_in_cache)
, index_granularity_info(index_granularity_info_)
, marks_loader(
data_part_storage,
data_part_reader_,
mark_cache,
index_granularity_info->getMarksFilePath(path_prefix),
marks_count,

View File

@ -9,6 +9,7 @@
#include <Compression/CompressedReadBufferFromFile.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Storages/MergeTree/MergeTreeMarksLoader.h>
#include <Storages/MergeTree/IMergeTreeDataPartInfoForReader.h>
namespace DB
@ -19,7 +20,7 @@ class MergeTreeReaderStream
{
public:
MergeTreeReaderStream(
DataPartStoragePtr data_part_storage_,
MergeTreeDataPartInfoForReaderPtr data_part_reader_,
const String & path_prefix_,
const String & data_file_extension_,
size_t marks_count_,

View File

@ -242,7 +242,7 @@ void MergeTreeReaderWide::addStreams(
auto * load_marks_threadpool = settings.read_settings.load_marks_asynchronously ? &context->getLoadMarksThreadpool() : nullptr;
streams.emplace(stream_name, std::make_unique<MergeTreeReaderStream>(
data_part_info_for_read->getDataPartStorage(), stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read, stream_name, DATA_FILE_EXTENSION,
data_part_info_for_read->getMarksCount(), all_mark_ranges, settings, mark_cache,
uncompressed_cache, data_part_info_for_read->getFileSizeOrZero(stream_name + DATA_FILE_EXTENSION),
&data_part_info_for_read->getIndexGranularityInfo(),

View File

@ -40,7 +40,8 @@ struct Settings;
M(Float, ratio_of_defaults_for_sparse_serialization, 1.0, "Minimal ratio of number of default values to number of all values in column to store it in sparse serializations. If >= 1, columns will be always written in full serialization.", 0) \
\
/** Merge settings. */ \
M(UInt64, merge_max_block_size, DEFAULT_MERGE_BLOCK_SIZE, "How many rows in blocks should be formed for merge operations.", 0) \
M(UInt64, merge_max_block_size, 8192, "How many rows in blocks should be formed for merge operations. By default has the same value as `index_granularity`.", 0) \
M(UInt64, merge_max_block_size_bytes, 10 * 1024 * 1024, "How many bytes in blocks should be formed for merge operations. By default has the same value as `index_granularity_bytes`.", 0) \
M(UInt64, max_bytes_to_merge_at_max_space_in_pool, 150ULL * 1024 * 1024 * 1024, "Maximum in total size of parts to merge, when there are maximum free threads in background pool (or entries in replication queue).", 0) \
M(UInt64, max_bytes_to_merge_at_min_space_in_pool, 1024 * 1024, "Maximum in total size of parts to merge, when there are minimum free threads in background pool (or entries in replication queue).", 0) \
M(UInt64, max_replicated_merges_in_queue, 1000, "How many tasks of merging and mutating parts are allowed simultaneously in ReplicatedMergeTree queue.", 0) \
@ -126,7 +127,8 @@ struct Settings;
M(UInt64, min_relative_delay_to_close, 300, "Minimal delay from other replicas to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, min_absolute_delay_to_close, 0, "Minimal absolute delay to close, stop serving requests and not return Ok during status check.", 0) \
M(UInt64, enable_vertical_merge_algorithm, 1, "Enable usage of Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * DEFAULT_MERGE_BLOCK_SIZE, "Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_rows_to_activate, 16 * 8192, "Minimal (approximate) sum of rows in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_bytes_to_activate, 0, "Minimal (approximate) uncompressed size in bytes in merging parts to activate Vertical merge algorithm.", 0) \
M(UInt64, vertical_merge_algorithm_min_columns_to_activate, 11, "Minimal amount of non-PK columns to activate Vertical merge algorithm.", 0) \
\
/** Compatibility settings */ \

View File

@ -220,12 +220,13 @@ HashJoinPtr StorageJoin::getJoinLocked(std::shared_ptr<TableJoin> analyzed_join,
Names left_key_names_resorted;
for (const auto & key_name : key_names)
{
const auto & renamed_key = analyzed_join->renamedRightColumnName(key_name);
const auto & renamed_key = analyzed_join->renamedRightColumnNameWithAlias(key_name);
/// find position of renamed_key in key_names_right
auto it = std::find(key_names_right.begin(), key_names_right.end(), renamed_key);
if (it == key_names_right.end())
throw Exception(ErrorCodes::INCOMPATIBLE_TYPE_OF_JOIN,
"Key '{}' not found in JOIN ON section. All Join engine keys '{}' have to be used", key_name, fmt::join(key_names, ", "));
"Key '{}' not found in JOIN ON section. Join engine key{} '{}' have to be used",
key_name, key_names.size() > 1 ? "s" : "", fmt::join(key_names, ", "));
const size_t key_position = std::distance(key_names_right.begin(), it);
left_key_names_resorted.push_back(key_names_left[key_position]);
}

View File

@ -1330,6 +1330,11 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
uncovered_unexpected_parts.size(), uncovered_unexpected_parts_rows, unexpected_parts_nonnew, unexpected_parts_nonnew_rows,
parts_to_fetch.size(), parts_to_fetch_blocks, covered_unexpected_parts.size(), unexpected_parts_rows - uncovered_unexpected_parts_rows);
}
else
{
if (!parts_to_fetch.empty())
LOG_DEBUG(log, "Found parts to fetch (exist in zookeeper, but not locally): [{}]", fmt::join(parts_to_fetch, ", "));
}
/// Add to the queue jobs to pick up the missing parts from other replicas and remove from ZK the information that we have them.
queue.setBrokenPartsToEnqueueFetchesOnLoading(std::move(parts_to_fetch));
@ -2470,8 +2475,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
{
/// We check that it was not suddenly upgraded to new version.
/// Otherwise it can be upgraded and instantly become lost, but we cannot notice that.
ops.push_back(zkutil::makeCreateRequest(fs::path(source_path) / "is_lost", "0", zkutil::CreateMode::Persistent));
ops.push_back(zkutil::makeRemoveRequest(fs::path(source_path) / "is_lost", -1));
zkutil::addCheckNotExistsRequest(ops, *zookeeper, fs::path(source_path) / "is_lost");
}
else /// The replica we clone should not suddenly become lost.
ops.push_back(zkutil::makeCheckRequest(fs::path(source_path) / "is_lost", source_is_lost_stat.version));
@ -8874,8 +8878,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
/// We must be sure that this part doesn't exist on other replicas
if (!zookeeper->exists(current_part_path))
{
ops.emplace_back(zkutil::makeCreateRequest(current_part_path, "", zkutil::CreateMode::Persistent));
ops.emplace_back(zkutil::makeRemoveRequest(current_part_path, -1));
zkutil::addCheckNotExistsRequest(ops, *zookeeper, current_part_path);
}
else
{

View File

@ -22,6 +22,7 @@ NamesAndTypesList StorageSystemMerges::getNamesAndTypes()
{"partition_id", std::make_shared<DataTypeString>()},
{"is_mutation", std::make_shared<DataTypeUInt8>()},
{"total_size_bytes_compressed", std::make_shared<DataTypeUInt64>()},
{"total_size_bytes_uncompressed", std::make_shared<DataTypeUInt64>()},
{"total_size_marks", std::make_shared<DataTypeUInt64>()},
{"bytes_read_uncompressed", std::make_shared<DataTypeUInt64>()},
{"rows_read", std::make_shared<DataTypeUInt64>()},
@ -59,6 +60,7 @@ void StorageSystemMerges::fillData(MutableColumns & res_columns, ContextPtr cont
res_columns[i++]->insert(merge.partition_id);
res_columns[i++]->insert(merge.is_mutation);
res_columns[i++]->insert(merge.total_size_bytes_compressed);
res_columns[i++]->insert(merge.total_size_bytes_uncompressed);
res_columns[i++]->insert(merge.total_size_marks);
res_columns[i++]->insert(merge.bytes_read_uncompressed);
res_columns[i++]->insert(merge.rows_read);

View File

@ -101,44 +101,45 @@ def run_s3_mocks(cluster):
)
def list_objects(cluster, path="data/"):
def list_objects(cluster, path="data/", hint="list_objects"):
minio = cluster.minio_client
objects = list(minio.list_objects(cluster.minio_bucket, path, recursive=True))
logging.info(f"list_objects ({len(objects)}): {[x.object_name for x in objects]}")
logging.info(f"{hint} ({len(objects)}): {[x.object_name for x in objects]}")
return objects
def wait_for_delete_s3_objects(cluster, expected, timeout=30):
minio = cluster.minio_client
while timeout > 0:
if (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== expected
):
if len(list_objects(cluster, "data/")) == expected:
return
timeout -= 1
time.sleep(1)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== expected
)
assert len(list_objects(cluster, "data/")) == expected
@pytest.fixture(autouse=True)
@pytest.mark.parametrize("node_name", ["node"])
def drop_table(cluster, node_name):
yield
node = cluster.instances[node_name]
def remove_all_s3_objects(cluster):
minio = cluster.minio_client
for obj in list_objects(cluster, "data/"):
minio.remove_object(cluster.minio_bucket, obj.object_name)
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
@pytest.fixture(autouse=True, scope="function")
def clear_minio(cluster):
try:
wait_for_delete_s3_objects(cluster, 0)
finally:
# CH do some writes to the S3 at start. For example, file data/clickhouse_access_check_{server_uuid}.
# Set the timeout there as 10 sec in order to resolve the race with that file exists.
wait_for_delete_s3_objects(cluster, 0, timeout=10)
except:
# Remove extra objects to prevent tests cascade failing
for obj in list_objects(cluster, "data/"):
minio.remove_object(cluster.minio_bucket, obj.object_name)
remove_all_s3_objects(cluster)
yield
def check_no_objects_after_drop(cluster, table_name="s3_test", node_name="node"):
node = cluster.instances[node_name]
node.query(f"DROP TABLE IF EXISTS {table_name} NO DELAY")
wait_for_delete_s3_objects(cluster, 0, timeout=0)
@pytest.mark.parametrize(
@ -158,10 +159,7 @@ def test_simple_insert_select(
values1 = generate_values("2020-01-03", 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values1))
assert node.query("SELECT * FROM s3_test order by dt, id FORMAT Values") == values1
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + files_per_part
)
assert len(list_objects(cluster, "data/")) == FILES_OVERHEAD + files_per_part
values2 = generate_values("2020-01-04", 4096)
node.query("INSERT INTO s3_test VALUES {}".format(values2))
@ -169,15 +167,14 @@ def test_simple_insert_select(
node.query("SELECT * FROM s3_test ORDER BY dt, id FORMAT Values")
== values1 + "," + values2
)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD + files_per_part * 2
)
assert len(list_objects(cluster, "data/")) == FILES_OVERHEAD + files_per_part * 2
assert (
node.query("SELECT count(*) FROM s3_test where id = 1 FORMAT Values") == "(2)"
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("merge_vertical,node_name", [(True, "node"), (False, "node")])
def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
@ -188,7 +185,6 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test", **settings)
minio = cluster.minio_client
node.query("SYSTEM STOP MERGES s3_test")
node.query(
@ -214,7 +210,7 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
node.query("SELECT count(distinct(id)) FROM s3_test FORMAT Values") == "(8192)"
)
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD_PER_PART_WIDE * 6 + FILES_OVERHEAD
)
@ -242,6 +238,8 @@ def test_insert_same_partition_and_merge(cluster, merge_vertical, node_name):
cluster, FILES_OVERHEAD_PER_PART_WIDE + FILES_OVERHEAD, timeout=45
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_alter_table_columns(cluster, node_name):
@ -287,12 +285,13 @@ def test_alter_table_columns(cluster, node_name):
cluster, FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE + 2
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_attach_detach_partition(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))
@ -355,12 +354,13 @@ def test_attach_detach_partition(cluster, node_name):
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 0
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_move_partition_to_another_disk(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))
@ -370,30 +370,31 @@ def test_move_partition_to_another_disk(cluster, node_name):
)
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 'hdd'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE
)
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-04' TO DISK 's3'")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_table_manipulations(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))
@ -405,9 +406,10 @@ def test_table_manipulations(cluster, node_name):
node.query("RENAME TABLE s3_test TO s3_renamed")
assert node.query("SELECT count(*) FROM s3_renamed FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
node.query("RENAME TABLE s3_renamed TO s3_test")
assert node.query("CHECK TABLE s3_test FORMAT Values") == "(1)"
@ -416,7 +418,7 @@ def test_table_manipulations(cluster, node_name):
node.query("ATTACH TABLE s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 2
)
@ -424,17 +426,15 @@ def test_table_manipulations(cluster, node_name):
wait_for_delete_empty_parts(node, "s3_test")
wait_for_delete_inactive_parts(node, "s3_test")
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(0)"
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
)
assert len(list_objects(cluster, "data/")) == FILES_OVERHEAD
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_move_replace_partition_to_another_table(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))
@ -451,12 +451,10 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
s3_objects = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
for obj in s3_objects:
print("Object at start", obj.object_name)
assert len(s3_objects) == FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
assert (
len(list_objects(cluster, "data/", "Objects at start"))
== FILES_OVERHEAD + FILES_OVERHEAD_PER_PART_WIDE * 4
)
create_table(node, "s3_clone")
node.query("ALTER TABLE s3_test MOVE PARTITION '2020-01-03' TO TABLE s3_clone")
@ -465,10 +463,8 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(8192)"
assert node.query("SELECT sum(id) FROM s3_clone FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_clone FORMAT Values") == "(8192)"
s3_objects = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
for obj in s3_objects:
print("Object after move partition", obj.object_name)
list_objects(cluster, "data/", "Object after move partition")
# Number of objects in S3 should be unchanged.
wait_for_delete_s3_objects(
cluster,
@ -486,10 +482,8 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
)
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
s3_objects = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
for obj in s3_objects:
print("Object after insert", obj.object_name)
list_objects(cluster, "data/", "Object after insert")
wait_for_delete_s3_objects(
cluster,
FILES_OVERHEAD * 2
@ -515,12 +509,8 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
node.query("DROP TABLE s3_clone NO DELAY")
assert node.query("SELECT sum(id) FROM s3_test FORMAT Values") == "(0)"
assert node.query("SELECT count(*) FROM s3_test FORMAT Values") == "(16384)"
s3_objects = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
for obj in s3_objects:
print("Object after drop", obj.object_name)
# Data should remain in S3
list_objects(cluster, "data/", "Object after drop")
wait_for_delete_s3_objects(
cluster,
FILES_OVERHEAD
@ -530,10 +520,7 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
node.query("ALTER TABLE s3_test FREEZE")
# Number S3 objects should be unchanged.
s3_objects = list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True))
for obj in s3_objects:
print("Object after freeze", obj.object_name)
list_objects(cluster, "data/", "Object after freeze")
wait_for_delete_s3_objects(
cluster,
FILES_OVERHEAD
@ -548,15 +535,13 @@ def test_move_replace_partition_to_another_table(cluster, node_name):
cluster, FILES_OVERHEAD_PER_PART_WIDE * 4 - FILES_OVERHEAD_METADATA_VERSION * 4
)
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)
remove_all_s3_objects(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_freeze_unfreeze(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-03", 4096))
@ -571,7 +556,7 @@ def test_freeze_unfreeze(cluster, node_name):
wait_for_delete_empty_parts(node, "s3_test")
wait_for_delete_inactive_parts(node, "s3_test")
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD
+ (FILES_OVERHEAD_PER_PART_WIDE - FILES_OVERHEAD_METADATA_VERSION) * 2
)
@ -583,13 +568,10 @@ def test_freeze_unfreeze(cluster, node_name):
# Unfreeze all partitions from backup2.
node.query("ALTER TABLE s3_test UNFREEZE WITH NAME 'backup2'")
# Data should be removed from S3.
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD)
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
@ -597,7 +579,6 @@ def test_freeze_system_unfreeze(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
create_table(node, "s3_test_removed")
minio = cluster.minio_client
node.query(
"INSERT INTO s3_test VALUES {}".format(generate_values("2020-01-04", 4096))
@ -613,7 +594,7 @@ def test_freeze_system_unfreeze(cluster, node_name):
wait_for_delete_inactive_parts(node, "s3_test")
node.query("DROP TABLE s3_test_removed NO DELAY")
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
len(list_objects(cluster, "data/"))
== FILES_OVERHEAD
+ (FILES_OVERHEAD_PER_PART_WIDE - FILES_OVERHEAD_METADATA_VERSION) * 2
)
@ -621,13 +602,10 @@ def test_freeze_system_unfreeze(cluster, node_name):
# Unfreeze all data from backup3.
node.query("SYSTEM UNFREEZE WITH NAME 'backup3'")
# Data should be removed from S3.
wait_for_delete_s3_objects(cluster, FILES_OVERHEAD)
# Data should be removed from S3.
assert (
len(list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)))
== FILES_OVERHEAD
)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
@ -673,6 +651,8 @@ def test_s3_disk_apply_new_settings(cluster, node_name):
# There should be 3 times more S3 requests because multi-part upload mode uses 3 requests to upload object.
assert get_s3_requests() - s3_requests_before == s3_requests_to_write_partition * 3
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
def test_s3_no_delete_objects(cluster, node_name):
@ -681,6 +661,7 @@ def test_s3_no_delete_objects(cluster, node_name):
node, "s3_test_no_delete_objects", storage_policy="no_delete_objects_s3"
)
node.query("DROP TABLE s3_test_no_delete_objects SYNC")
remove_all_s3_objects(cluster)
@pytest.mark.parametrize("node_name", ["node"])
@ -695,6 +676,7 @@ def test_s3_disk_reads_on_unstable_connection(cluster, node_name):
assert node.query("SELECT sum(id) FROM s3_test").splitlines() == [
"40499995500000"
]
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
@ -704,14 +686,13 @@ def test_lazy_seek_optimization_for_async_read(cluster, node_name):
node.query(
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY key SETTINGS storage_policy='s3';"
)
node.query("SYSTEM STOP MERGES s3_test")
node.query(
"INSERT INTO s3_test SELECT * FROM generateRandom('key UInt32, value String') LIMIT 10000000"
)
node.query("SELECT * FROM s3_test WHERE value LIKE '%abc%' ORDER BY value LIMIT 10")
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
minio = cluster.minio_client
for obj in list(minio.list_objects(cluster.minio_bucket, "data/", recursive=True)):
minio.remove_object(cluster.minio_bucket, obj.object_name)
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node_with_limited_disk"])
@ -721,6 +702,7 @@ def test_cache_with_full_disk_space(cluster, node_name):
node.query(
"CREATE TABLE s3_test (key UInt32, value String) Engine=MergeTree() ORDER BY value SETTINGS storage_policy='s3_with_cache_and_jbod';"
)
node.query("SYSTEM STOP MERGES s3_test")
node.query(
"INSERT INTO s3_test SELECT number, toString(number) FROM numbers(100000000)"
)
@ -739,7 +721,7 @@ def test_cache_with_full_disk_space(cluster, node_name):
assert node.contains_in_log(
"Insert into cache is skipped due to insufficient disk space"
)
node.query("DROP TABLE IF EXISTS s3_test NO DELAY")
check_no_objects_after_drop(cluster, node_name=node_name)
@pytest.mark.parametrize("node_name", ["node"])
@ -764,6 +746,7 @@ def test_store_cleanup_disk_s3(cluster, node_name):
"CREATE TABLE s3_test UUID '00000000-1000-4000-8000-000000000001' (n UInt64) Engine=MergeTree() ORDER BY n SETTINGS storage_policy='s3';"
)
node.query("INSERT INTO s3_test SELECT 1")
check_no_objects_after_drop(cluster)
@pytest.mark.parametrize("node_name", ["node"])
@ -840,3 +823,5 @@ def test_cache_setting_compatibility(cluster, node_name):
node.query("SELECT * FROM s3_test FORMAT Null")
assert not node.contains_in_log("No such file or directory: Cache info:")
check_no_objects_after_drop(cluster)

View File

@ -246,18 +246,18 @@ toUnixTimestamp
1426415400
1426415400
date_trunc
2019-01-01
2020-01-01
2020-01-01
2019-10-01
2020-01-01
2020-01-01
2019-12-01
2020-01-01
2020-01-01
2019-12-30
2019-12-30
2019-12-30
2019-01-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-10-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-12-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-12-30 00:00:00
2019-12-30 00:00:00
2019-12-30 00:00:00
2019-12-31 00:00:00
2020-01-01 00:00:00
2020-01-02 00:00:00
@ -270,18 +270,18 @@ date_trunc
2019-12-31 20:11:22
2020-01-01 12:11:22
2020-01-02 05:11:22
2019-01-01
2020-01-01
2020-01-01
2019-10-01
2020-01-01
2020-01-01
2019-12-01
2020-01-01
2020-01-01
2019-12-30
2019-12-30
2019-12-30
2019-01-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-10-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-12-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-12-30 00:00:00
2019-12-30 00:00:00
2019-12-30 00:00:00
2019-12-31 00:00:00
2020-01-01 00:00:00
2020-01-02 00:00:00
@ -294,8 +294,8 @@ date_trunc
2019-12-31 20:11:22
2020-01-01 12:11:22
2020-01-02 05:11:22
2020-01-01
2020-01-01
2020-01-01
2019-12-30
2020-01-01 00:00:00
2020-01-01 00:00:00
2020-01-01 00:00:00
2019-12-30 00:00:00
2020-01-01 00:00:00

View File

@ -5,7 +5,7 @@
-1275 -424.99999983 -255 -1275 -424.99999983 -255
101 101 101 101 101 101
-101 -101 -101 -101 -101 -101
(101,101,101) (101,101,101) (101,101,101) (101,101,101) (102,100,101)
(101,101,101) (101,101,101) (101,101,101) (101,101,101) (1,1,1,1,1,1)
5 5 5
10 10 10
-50 -50 -16.66666666 -16.66666666 -10 -10

View File

@ -24,7 +24,7 @@ SELECT (uniq(a), uniq(b), uniq(c)),
(uniqCombined(a), uniqCombined(b), uniqCombined(c)),
(uniqCombined(17)(a), uniqCombined(17)(b), uniqCombined(17)(c)),
(uniqExact(a), uniqExact(b), uniqExact(c)),
(uniqHLL12(a), uniqHLL12(b), uniqHLL12(c))
(102 - uniqHLL12(a) >= 0, 102 - uniqHLL12(b) >= 0, 102 - uniqHLL12(c) >= 0, uniqHLL12(a) - 99 >= 0, uniqHLL12(b) - 99 >= 0, uniqHLL12(c) - 99 >= 0)
FROM (SELECT * FROM decimal ORDER BY a);
SELECT uniqUpTo(10)(a), uniqUpTo(10)(b), uniqUpTo(10)(c) FROM decimal WHERE a >= 0 AND a < 5;

View File

@ -135,13 +135,13 @@ Code: 43
------------------------------------------
SELECT date_trunc(\'year\', N, \'Asia/Istanbul\')
Code: 43
"Date","2019-01-01"
"Date","2019-01-01"
"DateTime('Asia/Istanbul')","2019-01-01 00:00:00"
"DateTime('Asia/Istanbul')","2019-01-01 00:00:00"
------------------------------------------
SELECT date_trunc(\'month\', N, \'Asia/Istanbul\')
Code: 43
"Date","2019-09-01"
"Date","2019-09-01"
"DateTime('Asia/Istanbul')","2019-09-01 00:00:00"
"DateTime('Asia/Istanbul')","2019-09-01 00:00:00"
------------------------------------------
SELECT date_trunc(\'day\', N, \'Asia/Istanbul\')
"DateTime('Asia/Istanbul')","2019-09-16 00:00:00"

View File

@ -63,7 +63,6 @@ function thread6()
done
}
# https://stackoverflow.com/questions/9954794/execute-a-shell-function-with-timeout
export -f thread1;
export -f thread2;

View File

@ -18,22 +18,18 @@ Response 0 Create /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 4
Request 0 Exists /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 0 \N \N \N 0 0 0 0
Response 0 Exists /test/01158/default/rmt/replicas/1/parts/all_0_0_0 0 0 \N 0 0 ZOK \N \N 0 0 96 0
blocks
Request 0 Multi 0 0 \N 3 0 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 \N \N \N 0 0 0 0
Request 0 Remove /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/block_numbers/all/block- 1 1 \N 0 3 \N \N \N 0 0 0 0
Response 0 Multi 0 0 \N 3 0 ZOK \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 ZOK \N \N /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 0 0
Response 0 Remove /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 ZOK \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/block_numbers/all/block- 1 1 \N 0 3 ZOK \N \N /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 0 0
Request 0 Multi 0 0 \N 3 0 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 \N \N \N 0 0 0 0
Request 0 Remove /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/block_numbers/all/block- 1 1 \N 0 3 \N \N \N 0 0 0 0
Response 0 Multi 0 0 \N 3 0 ZNODEEXISTS \N \N 0 0 0 0
Response 0 Error /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 1 ZNODEEXISTS \N \N 0 0 0 0
Response 0 Error /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 2 ZRUNTIMEINCONSISTENCY \N \N 0 0 0 0
Response 0 Error /test/01158/default/rmt/block_numbers/all/block- 1 1 \N 0 3 ZRUNTIMEINCONSISTENCY \N \N 0 0 0 0
Request 0 Multi 0 0 \N 2 0 \N \N \N 0 0 0 0
Request 0 CheckNotExists /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 1 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/block_numbers/all/block- 1 1 \N 0 2 \N \N \N 0 0 0 0
Response 0 Multi 0 0 \N 2 0 ZOK \N \N 0 0 0 0
Response 0 CheckNotExists /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 1 ZOK \N \N 0 0 0 0
Response 0 Create /test/01158/default/rmt/block_numbers/all/block- 1 1 \N 0 2 ZOK \N \N /test/01158/default/rmt/block_numbers/all/block-0000000000 0 0 0 0
Request 0 Multi 0 0 \N 2 0 \N \N \N 0 0 0 0
Request 0 CheckNotExists /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 1 \N \N \N 0 0 0 0
Request 0 Create /test/01158/default/rmt/block_numbers/all/block- 1 1 \N 0 2 \N \N \N 0 0 0 0
Response 0 Multi 0 0 \N 2 0 ZNODEEXISTS \N \N 0 0 0 0
Response 0 Error /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 -1 0 1 ZNODEEXISTS \N \N 0 0 0 0
Response 0 Error /test/01158/default/rmt/block_numbers/all/block- 1 1 \N 0 2 ZRUNTIMEINCONSISTENCY \N \N 0 0 0 0
Request 0 Get /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 0 \N \N \N 0 0 0 0
Response 0 Get /test/01158/default/rmt/blocks/all_6308706741995381342_2495791770474910886 0 0 \N 0 0 ZOK \N \N 0 0 9 0
duration_ms

View File

@ -1,5 +1,4 @@
#!/usr/bin/env bash
# Tags: long
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
@ -8,7 +7,8 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# NOTE: database = $CLICKHOUSE_DATABASE is unwanted
verify_sql="SELECT
(SELECT sumIf(value, metric = 'PartsActive'), sumIf(value, metric = 'PartsOutdated') FROM system.metrics)
= (SELECT sum(active), sum(NOT active) FROM system.parts)"
= (SELECT sum(active), sum(NOT active) FROM
(SELECT active FROM system.parts UNION ALL SELECT active FROM system.projection_parts))"
# The query is not atomic - it can compare states between system.parts and system.metrics from different points in time.
# So, there is inherent race condition. But it should get expected result eventually.

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: long, no-s3-storage
# Tags: no-s3-storage
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
@ -11,7 +11,8 @@ set -o pipefail
# NOTE: database = $CLICKHOUSE_DATABASE is unwanted
verify_sql="SELECT
(SELECT sumIf(value, metric = 'PartsInMemory'), sumIf(value, metric = 'PartsCompact'), sumIf(value, metric = 'PartsWide') FROM system.metrics) =
(SELECT countIf(part_type == 'InMemory'), countIf(part_type == 'Compact'), countIf(part_type == 'Wide') FROM system.parts)"
(SELECT countIf(part_type == 'InMemory'), countIf(part_type == 'Compact'), countIf(part_type == 'Wide')
FROM (SELECT part_type FROM system.parts UNION ALL SELECT part_type FROM system.projection_parts))"
# The query is not atomic - it can compare states between system.parts and system.metrics from different points in time.
# So, there is inherent race condition (especially in fasttest that runs tests in parallel).

View File

@ -361,6 +361,7 @@ CREATE TABLE system.merges
`partition_id` String,
`is_mutation` UInt8,
`total_size_bytes_compressed` UInt64,
`total_size_bytes_uncompressed` UInt64,
`total_size_marks` UInt64,
`bytes_read_uncompressed` UInt64,
`rows_read` UInt64,

View File

@ -1,5 +1,7 @@
-- Tags: no-fasttest
SET send_logs_level = 'fatal';
drop table if exists rmt;
drop table if exists rmt2;

View File

@ -0,0 +1,3 @@
serialized state is not used 1
serialized state is used 1
via Distributed 1

View File

@ -0,0 +1,36 @@
DROP TABLE IF EXISTS 02713_seqt;
DROP TABLE IF EXISTS 02713_seqt_distr;
SELECT
'serialized state is not used', sequenceMatch('(?1)(?2)')(time, number_ = 1, number_ = 0) AS seq
FROM
(
SELECT
number AS time,
number % 2 AS number_
FROM numbers_mt(100)
);
CREATE TABLE 02713_seqt
ENGINE = MergeTree
ORDER BY n AS
SELECT
sequenceMatchState('(?1)(?2)')(time, number_ = 1, number_ = 0) AS seq,
1 AS n
FROM
(
SELECT
number AS time,
number % 2 AS number_
FROM numbers_mt(100)
);
SELECT 'serialized state is used', sequenceMatchMerge('(?1)(?2)')(seq) AS seq
FROM 02713_seqt;
CREATE TABLE 02713_seqt_distr ( seq AggregateFunction(sequenceMatch('(?1)(?2)'), UInt64, UInt8, UInt8) , n UInt8) ENGINE = Distributed(test_shard_localhost, currentDatabase(), '02713_seqt');
SELECT 'via Distributed', sequenceMatchMerge('(?1)(?2)')(seq) AS seq FROM 02713_seqt_distr;

View File

@ -0,0 +1,6 @@
0
0
0
0
0
0

View File

@ -0,0 +1,21 @@
CREATE TABLE user(id UInt32, name String) ENGINE = Join(ANY, LEFT, id);
INSERT INTO user VALUES (1,'U1')(2,'U2')(3,'U3');
CREATE TABLE product(id UInt32, name String, cate String) ENGINE = Join(ANY, LEFT, id);
INSERT INTO product VALUES (1,'P1','C1')(2,'P2','C1')(3,'P3','C2');
CREATE TABLE order(id UInt32, pId UInt32, uId UInt32) ENGINE = TinyLog;
INSERT INTO order VALUES (1,1,1)(2,1,2)(3,2,3);
SELECT ignore(*) FROM (
SELECT
uId,
user.id as `uuu`
FROM order
LEFT ANY JOIN user
ON uId = `uuu`
);
SELECT ignore(*) FROM order
LEFT ANY JOIN user ON uId = user.id
LEFT ANY JOIN product ON pId = product.id;

View File

@ -0,0 +1,7 @@
drop table if exists alias_column_should_not_allow_compression;
create table if not exists alias_column_should_not_allow_compression ( user_id UUID, user_id_hashed ALIAS (cityHash64(user_id))) engine=MergeTree() order by tuple();
create table if not exists alias_column_should_not_allow_compression_fail ( user_id UUID, user_id_hashed ALIAS (cityHash64(user_id)) codec(LZ4HC(1))) engine=MergeTree() order by tuple(); -- { serverError BAD_ARGUMENTS }
alter table alias_column_should_not_allow_compression modify column user_id codec(LZ4HC(1));
alter table alias_column_should_not_allow_compression modify column user_id_hashed codec(LZ4HC(1)); -- { serverError BAD_ARGUMENTS }
alter table alias_column_should_not_allow_compression add column user_id_hashed_1 UInt64 ALIAS (cityHash64(user_id)) codec(LZ4HC(1)); -- { serverError BAD_ARGUMENTS }
drop table if exists alias_column_should_not_allow_compression;

Some files were not shown because too many files have changed in this diff Show More