Merge branch 'master' into BLAKE3

This commit is contained in:
BoloniniD 2022-08-06 19:48:10 +03:00 committed by GitHub
commit 24f3b00193
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
200 changed files with 2969 additions and 1436 deletions

View File

@ -2,8 +2,47 @@
#include <cstring>
#include <type_traits>
#include <bit>
inline void reverseMemcpy(void * dst, const void * src, size_t size)
{
uint8_t * uint_dst = reinterpret_cast<uint8_t *>(dst);
const uint8_t * uint_src = reinterpret_cast<const uint8_t *>(src);
uint_dst += size;
while (size)
{
--uint_dst;
*uint_dst = *uint_src;
++uint_src;
--size;
}
}
template <typename T>
inline T unalignedLoadLE(const void * address)
{
T res {};
if constexpr (std::endian::native == std::endian::little)
memcpy(&res, address, sizeof(res));
else
reverseMemcpy(&res, address, sizeof(res));
return res;
}
template <typename T>
inline void unalignedStoreLE(void * address,
const typename std::enable_if<true, T>::type & src)
{
static_assert(std::is_trivially_copyable_v<T>);
if constexpr (std::endian::native == std::endian::little)
memcpy(address, &src, sizeof(src));
else
reverseMemcpy(address, &src, sizeof(src));
}
template <typename T>
inline T unalignedLoad(const void * address)
{

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit 2ef198694e10c86175ee6ead389346d199060437
Subproject commit 1b0af760b3506b8e35b50cb7df098cbad5064ff2

2
contrib/azure vendored

@ -1 +1 @@
Subproject commit ac4b763d4ca40122275f1497cbdc5451337461d9
Subproject commit ef75afc075fc71fbcd8fe28dcda3794ae265fd1c

View File

@ -1,6 +1,6 @@
option (ENABLE_AZURE_BLOB_STORAGE "Enable Azure blob storage" ${ENABLE_LIBRARIES})
if (NOT ENABLE_AZURE_BLOB_STORAGE)
if (NOT ENABLE_AZURE_BLOB_STORAGE OR BUILD_STANDALONE_KEEPER OR OS_FREEBSD)
message(STATUS "Not using Azure blob storage")
return()
endif()

View File

@ -62,7 +62,7 @@ def pre_build(repo_path: str, env_variables: List[str]):
f"git -C {repo_path} fetch --no-recurse-submodules "
"--no-tags origin master:master"
)
logging.info("Getting master branch for performance artifact: ''%s'", cmd)
logging.info("Getting master branch for performance artifact: '%s'", cmd)
subprocess.check_call(cmd, shell=True)

View File

@ -57,7 +57,15 @@ do
# check if variable not empty
[ -z "$dir" ] && continue
# ensure directories exist
if ! mkdir -p "$dir"; then
if [ "$DO_CHOWN" = "1" ]; then
mkdir="mkdir"
else
# if DO_CHOWN=0 it means that the system does not map root user to "admin" permissions
# it mainly happens on NFS mounts where root==nobody for security reasons
# thus mkdir MUST run with user id/gid and not from nobody that has zero permissions
mkdir="/usr/bin/clickhouse su "${USER}:${GROUP}" mkdir"
fi
if ! $mkdir -p "$dir"; then
echo "Couldn't create necessary directory: $dir"
exit 1
fi

View File

@ -878,8 +878,6 @@ User can assign new big parts to different disks of a [JBOD](https://en.wikipedi
`MergeTree` family table engines can store data to [S3](https://aws.amazon.com/s3/) using a disk with type `s3`.
This feature is under development and not ready for production. There are known drawbacks such as very low performance.
Configuration markup:
``` xml
<storage_configuration>

View File

@ -36,15 +36,4 @@ then
# Push to GitHub rewriting the existing contents.
# Sometimes it does not work with error message "! [remote rejected] master -> master (cannot lock ref 'refs/heads/master': is at 42a0f6b6b6c7be56a469441b4bf29685c1cebac3 but expected 520e9b02c0d4678a2a5f41d2f561e6532fb98cc1)"
for _ in {1..10}; do git push --force origin master && break; sleep 5; done
# Turn off logging.
set +x
if [[ -n "${CLOUDFLARE_TOKEN}" ]]
then
sleep 1m
# https://api.cloudflare.com/#zone-purge-files-by-cache-tags,-host-or-prefix
POST_DATA='{"hosts":["clickhouse.com"]}'
curl -X POST "https://api.cloudflare.com/client/v4/zones/4fc6fb1d46e87851605aa7fa69ca6fe0/purge_cache" -H "Authorization: Bearer ${CLOUDFLARE_TOKEN}" -H "Content-Type:application/json" --data "${POST_DATA}"
fi
fi

View File

@ -10,7 +10,7 @@ sidebar_position: 10
这个由两部分组成的意思有两个结果:
- 唯一正确的写“Click**H** house”的方式是用大写H。
- 唯一正确的写“Click**H**ouse”的方式是用大写H。
- 如果需要缩写,请使用“**CH**”。由于一些历史原因缩写CK在中国也很流行主要是因为中文中最早的一个关于ClickHouse的演讲使用了这种形式。
!!! info “有趣的事实”

View File

@ -3,6 +3,7 @@ package database
import (
"database/sql"
"fmt"
"net/url"
"strings"
"github.com/ClickHouse/ClickHouse/programs/diagnostics/internal/platform/data"
@ -17,7 +18,7 @@ type ClickhouseNativeClient struct {
func NewNativeClient(host string, port uint16, username string, password string) (*ClickhouseNativeClient, error) {
// debug output ?debug=true
connection, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://%s:%s@%s:%d/", username, password, host, port))
connection, err := sql.Open("clickhouse", fmt.Sprintf("clickhouse://%s:%s@%s:%d/", url.QueryEscape(username), url.QueryEscape(password), host, port))
if err != nil {
return &ClickhouseNativeClient{}, err
}

View File

@ -501,20 +501,28 @@
const server_address = document.getElementById('url').value;
const url = server_address +
var url = server_address +
(server_address.indexOf('?') >= 0 ? '&' : '?') +
/// Ask server to allow cross-domain requests.
'add_http_cors_header=1' +
'&user=' + encodeURIComponent(user) +
'&password=' + encodeURIComponent(password) +
'&default_format=JSONCompact' +
/// Safety settings to prevent results that browser cannot display.
'&max_result_rows=1000&max_result_bytes=10000000&result_overflow_mode=break';
// If play.html is opened locally, append username and password to the URL parameter to avoid CORS issue.
if (document.location.href.startsWith("file://")) {
url += '&user=' + encodeURIComponent(user) +
'&password=' + encodeURIComponent(password)
}
const xhr = new XMLHttpRequest;
xhr.open('POST', url, true);
// If play.html is open normally, use Basic auth to prevent username and password being exposed in URL parameters
if (!document.location.href.startsWith("file://")) {
xhr.setRequestHeader("Authorization", "Basic " + btoa(user+":"+password));
}
xhr.onreadystatechange = function()
{
if (posted_request_num != request_num) {

View File

@ -144,8 +144,8 @@ endif ()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp Functions/FunctionsLogical.cpp)
list (APPEND dbms_headers Functions/IFunction.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/FunctionsLogical.h)
list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp Functions/FunctionsLogical.cpp Functions/indexHint.cpp)
list (APPEND dbms_headers Functions/IFunction.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/FunctionsLogical.h Functions/indexHint.h)
list (APPEND dbms_sources
AggregateFunctions/IAggregateFunction.cpp
@ -488,7 +488,7 @@ if (TARGET ch_contrib::aws_s3)
endif()
if (TARGET ch_contrib::azure_sdk)
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::azure_sdk)
dbms_target_link_libraries (PRIVATE ch_contrib::azure_sdk)
endif()
if (TARGET ch_contrib::s2)
@ -611,4 +611,3 @@ if (ENABLE_TESTS)
add_check(unit_tests_dbms)
endif ()

View File

@ -263,11 +263,6 @@ public:
}
}
SerializationInfoPtr getSerializationInfo() const override
{
return data->getSerializationInfo();
}
bool isNullable() const override { return isColumnNullable(*data); }
bool onlyNull() const override { return data->isNullAt(0); }
bool isNumeric() const override { return data->isNumeric(); }

View File

@ -561,15 +561,4 @@ void ColumnTuple::getIndicesOfNonDefaultRows(Offsets & indices, size_t from, siz
return getIndicesOfNonDefaultRowsImpl<ColumnTuple>(indices, from, limit);
}
SerializationInfoPtr ColumnTuple::getSerializationInfo() const
{
MutableSerializationInfos infos;
infos.reserve(columns.size());
for (const auto & column : columns)
infos.push_back(const_pointer_cast<SerializationInfo>(column->getSerializationInfo()));
return std::make_shared<SerializationInfoTuple>(std::move(infos), SerializationInfo::Settings{});
}
}

View File

@ -102,7 +102,6 @@ public:
ColumnPtr compress() const override;
double getRatioOfDefaultRows(double sample_ratio) const override;
void getIndicesOfNonDefaultRows(Offsets & indices, size_t from, size_t limit) const override;
SerializationInfoPtr getSerializationInfo() const override;
size_t tupleSize() const { return columns.size(); }

View File

@ -64,11 +64,6 @@ ColumnPtr IColumn::createWithOffsets(const Offsets & offsets, const Field & defa
return res;
}
SerializationInfoPtr IColumn::getSerializationInfo() const
{
return std::make_shared<SerializationInfo>(ISerialization::getKind(*this), SerializationInfo::Settings{});
}
bool isColumnNullable(const IColumn & column)
{
return checkColumn<ColumnNullable>(column);

View File

@ -35,9 +35,6 @@ class ColumnGathererStream;
class Field;
class WeakHash32;
class SerializationInfo;
using SerializationInfoPtr = std::shared_ptr<const SerializationInfo>;
/*
* Represents a set of equal ranges in previous column to perform sorting in current column.
* Used in sorting by tuples.
@ -445,8 +442,6 @@ public:
/// Used to create full column from sparse.
[[nodiscard]] virtual Ptr createWithOffsets(const Offsets & offsets, const Field & default_field, size_t total_rows, size_t shift) const;
[[nodiscard]] virtual SerializationInfoPtr getSerializationInfo() const;
/// Compress column in memory to some representation that allows to decompress it back.
/// Return itself if compression is not applicable for this column type.
[[nodiscard]] virtual Ptr compress() const

View File

@ -111,7 +111,7 @@ public:
while (data + 8 <= end)
{
current_word = unalignedLoad<UInt64>(data);
current_word = unalignedLoadLE<UInt64>(data);
v3 ^= current_word;
SIPROUND;
@ -157,8 +157,8 @@ public:
void get128(char * out)
{
finalize();
unalignedStore<UInt64>(out, v0 ^ v1);
unalignedStore<UInt64>(out + 8, v2 ^ v3);
unalignedStoreLE<UInt64>(out, v0 ^ v1);
unalignedStoreLE<UInt64>(out + 8, v2 ^ v3);
}
template <typename T>

View File

@ -238,27 +238,36 @@ TaskStatsInfoGetter::TaskStatsInfoGetter()
if (netlink_socket_fd < 0)
throwFromErrno("Can't create PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
/// On some containerized environments, operation on Netlink socket could hang forever.
/// We set reasonably small timeout to overcome this issue.
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 50000;
if (0 != ::setsockopt(netlink_socket_fd, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char *>(&tv), sizeof(tv)))
throwFromErrno("Can't set timeout on PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
union
try
{
::sockaddr_nl addr{};
::sockaddr sockaddr;
};
addr.nl_family = AF_NETLINK;
/// On some containerized environments, operation on Netlink socket could hang forever.
/// We set reasonably small timeout to overcome this issue.
if (::bind(netlink_socket_fd, &sockaddr, sizeof(addr)) < 0)
throwFromErrno("Can't bind PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
struct timeval tv;
tv.tv_sec = 0;
tv.tv_usec = 50000;
taskstats_family_id = getFamilyId(netlink_socket_fd);
if (0 != ::setsockopt(netlink_socket_fd, SOL_SOCKET, SO_RCVTIMEO, reinterpret_cast<const char *>(&tv), sizeof(tv)))
throwFromErrno("Can't set timeout on PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
union
{
::sockaddr_nl addr{};
::sockaddr sockaddr;
};
addr.nl_family = AF_NETLINK;
if (::bind(netlink_socket_fd, &sockaddr, sizeof(addr)) < 0)
throwFromErrno("Can't bind PF_NETLINK socket", ErrorCodes::NETLINK_ERROR);
taskstats_family_id = getFamilyId(netlink_socket_fd);
}
catch (...)
{
if (netlink_socket_fd >= 0)
close(netlink_socket_fd);
throw;
}
}

View File

@ -8,7 +8,8 @@ struct KeeperContext
enum class Phase : uint8_t
{
INIT,
RUNNING
RUNNING,
SHUTDOWN
};
Phase server_state{Phase::INIT};

View File

@ -20,6 +20,7 @@
#include <libnuraft/raft_server.hxx>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Common/ZooKeeper/ZooKeeperIO.h>
#include <Common/Stopwatch.h>
@ -107,8 +108,9 @@ KeeperServer::KeeperServer(
: server_id(configuration_and_settings_->server_id)
, coordination_settings(configuration_and_settings_->coordination_settings)
, log(&Poco::Logger::get("KeeperServer"))
, is_recovering(config.has("keeper_server.force_recovery") && config.getBool("keeper_server.force_recovery"))
, is_recovering(config.getBool("keeper_server.force_recovery", false))
, keeper_context{std::make_shared<KeeperContext>()}
, create_snapshot_on_exit(config.getBool("keeper_server.create_snapshot_on_exit", true))
{
if (coordination_settings->quorum_reads)
LOG_WARNING(log, "Quorum reads enabled, Keeper will work slower.");
@ -173,6 +175,17 @@ struct KeeperServer::KeeperRaftServer : public nuraft::raft_server
reconfigure(new_config);
}
void commit_in_bg() override
{
// For NuRaft, if any commit fails (uncaught exception) the whole server aborts as a safety
// This includes failed allocation which can produce an unknown state for the storage,
// making it impossible to handle correctly.
// We block the memory tracker for all the commit operations (including KeeperStateMachine::commit)
// assuming that the allocations are small
LockMemoryExceptionInThread blocker{VariableContext::Global};
nuraft::raft_server::commit_in_bg();
}
using nuraft::raft_server::raft_server;
// peers are initially marked as responding because at least one cycle
@ -367,6 +380,12 @@ void KeeperServer::shutdownRaftServer()
}
raft_instance->shutdown();
keeper_context->server_state = KeeperContext::Phase::SHUTDOWN;
if (create_snapshot_on_exit)
raft_instance->create_snapshot();
raft_instance.reset();
if (asio_listener)

View File

@ -64,6 +64,8 @@ private:
std::shared_ptr<KeeperContext> keeper_context;
const bool create_snapshot_on_exit;
public:
KeeperServer(
const KeeperConfigurationAndSettingsPtr & settings_,

View File

@ -395,7 +395,14 @@ void KeeperStateMachine::create_snapshot(nuraft::snapshot & s, nuraft::async_res
};
LOG_DEBUG(log, "In memory snapshot {} created, queueing task to flash to disk", s.get_last_log_idx());
if (keeper_context->server_state == KeeperContext::Phase::SHUTDOWN)
{
LOG_INFO(log, "Creating a snapshot during shutdown because 'create_snapshot_on_exit' is enabled.");
snapshot_task.create_snapshot(std::move(snapshot_task.snapshot));
return;
}
LOG_DEBUG(log, "In memory snapshot {} created, queueing task to flush to disk", s.get_last_log_idx());
/// Flush snapshot to disk in a separate thread.
if (!snapshots_queue.push(std::move(snapshot_task)))
LOG_WARNING(log, "Cannot push snapshot task into queue");

View File

@ -14,7 +14,7 @@
#include <Common/hex.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Common/LockMemoryExceptionInThread.h>
#include <Coordination/pathUtils.h>
#include <Coordination/KeeperConstants.h>
#include <sstream>
@ -2127,7 +2127,7 @@ void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing)
// if an exception occurs during rollback, the best option is to terminate because we can end up in an inconsistent state
// we block memory tracking so we can avoid terminating if we're rollbacking because of memory limit
MemoryTrackerBlockerInThread temporarily_ignore_any_memory_limits;
LockMemoryExceptionInThread blocker{VariableContext::Global};
try
{
uncommitted_transactions.pop_back();

View File

@ -557,6 +557,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, query_plan_enable_optimizations, true, "Apply optimizations to query plan", 0) \
M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \
M(Bool, query_plan_filter_push_down, true, "Allow to push down filter by predicate query plan step", 0) \
M(Bool, query_plan_optimize_primary_key, true, "Analyze primary key using query plan (instead of AST)", 0) \
M(UInt64, regexp_max_matches_per_row, 1000, "Max matches of any single regexp per row, used to safeguard 'extractAllGroupsHorizontal' against consuming too much memory with greedy RE.", 0) \
\
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \

View File

@ -119,7 +119,6 @@ void DataTypeFactory::registerDataType(const String & family_name, Value creator
throw Exception("DataTypeFactory: the data type family name '" + family_name + "' is not unique",
ErrorCodes::LOGICAL_ERROR);
if (case_sensitiveness == CaseInsensitive
&& !case_insensitive_data_types.emplace(family_name_lowercase, creator).second)
throw Exception("DataTypeFactory: the case insensitive data type family name '" + family_name + "' is not unique",

View File

@ -25,7 +25,7 @@ class DataTypeFactory final : private boost::noncopyable, public IFactoryWithAli
private:
using SimpleCreator = std::function<DataTypePtr()>;
using DataTypesDictionary = std::unordered_map<String, Value>;
using CreatorWithCustom = std::function<std::pair<DataTypePtr,DataTypeCustomDescPtr>(const ASTPtr & parameters)>;
using CreatorWithCustom = std::function<std::pair<DataTypePtr, DataTypeCustomDescPtr>(const ASTPtr & parameters)>;
using SimpleCreatorWithCustom = std::function<std::pair<DataTypePtr,DataTypeCustomDescPtr>()>;
public:

View File

@ -2,6 +2,7 @@
#include <base/range.h>
#include <Common/StringUtils/StringUtils.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnConst.h>
#include <Core/Field.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
@ -270,6 +271,7 @@ size_t DataTypeTuple::getSizeOfValueInMemory() const
SerializationPtr DataTypeTuple::doGetDefaultSerialization() const
{
SerializationTuple::ElementSerializations serializations(elems.size());
for (size_t i = 0; i < elems.size(); ++i)
{
String elem_name = have_explicit_names ? names[i] : toString(i + 1);
@ -302,7 +304,27 @@ MutableSerializationInfoPtr DataTypeTuple::createSerializationInfo(const Seriali
for (const auto & elem : elems)
infos.push_back(elem->createSerializationInfo(settings));
return std::make_shared<SerializationInfoTuple>(std::move(infos), settings);
return std::make_shared<SerializationInfoTuple>(std::move(infos), names, settings);
}
SerializationInfoPtr DataTypeTuple::getSerializationInfo(const IColumn & column) const
{
if (const auto * column_const = checkAndGetColumn<ColumnConst>(&column))
return getSerializationInfo(column_const->getDataColumn());
MutableSerializationInfos infos;
infos.reserve(elems.size());
const auto & column_tuple = assert_cast<const ColumnTuple &>(column);
assert(elems.size() == column_tuple.getColumns().size());
for (size_t i = 0; i < elems.size(); ++i)
{
auto element_info = elems[i]->getSerializationInfo(column_tuple.getColumn(i));
infos.push_back(const_pointer_cast<SerializationInfo>(element_info));
}
return std::make_shared<SerializationInfoTuple>(std::move(infos), names, SerializationInfo::Settings{});
}

View File

@ -23,6 +23,7 @@ private:
DataTypes elems;
Strings names;
bool have_explicit_names;
public:
static constexpr bool is_parametric = true;
@ -55,6 +56,7 @@ public:
SerializationPtr doGetDefaultSerialization() const override;
SerializationPtr getSerialization(const SerializationInfo & info) const override;
MutableSerializationInfoPtr createSerializationInfo(const SerializationInfo::Settings & settings) const override;
SerializationInfoPtr getSerializationInfo(const IColumn & column) const override;
const DataTypePtr & getElement(size_t i) const { return elems[i]; }
const DataTypes & getElements() const { return elems; }

View File

@ -179,12 +179,19 @@ void IDataType::setCustomization(DataTypeCustomDescPtr custom_desc_) const
custom_serialization = std::move(custom_desc_->serialization);
}
MutableSerializationInfoPtr IDataType::createSerializationInfo(
const SerializationInfo::Settings & settings) const
MutableSerializationInfoPtr IDataType::createSerializationInfo(const SerializationInfo::Settings & settings) const
{
return std::make_shared<SerializationInfo>(ISerialization::Kind::DEFAULT, settings);
}
SerializationInfoPtr IDataType::getSerializationInfo(const IColumn & column) const
{
if (const auto * column_const = checkAndGetColumn<ColumnConst>(&column))
return getSerializationInfo(column_const->getDataColumn());
return std::make_shared<SerializationInfo>(ISerialization::getKind(column), SerializationInfo::Settings{});
}
SerializationPtr IDataType::getDefaultSerialization() const
{
if (custom_serialization)

View File

@ -101,8 +101,8 @@ public:
Names getSubcolumnNames() const;
virtual MutableSerializationInfoPtr createSerializationInfo(
const SerializationInfo::Settings & settings) const;
virtual MutableSerializationInfoPtr createSerializationInfo(const SerializationInfo::Settings & settings) const;
virtual SerializationInfoPtr getSerializationInfo(const IColumn & column) const;
/// TODO: support more types.
virtual bool supportsSparseSerialization() const { return !haveSubtypes(); }

View File

@ -224,8 +224,10 @@ String ISerialization::getSubcolumnNameForStream(const SubstreamPath & path, siz
void ISerialization::addToSubstreamsCache(SubstreamsCache * cache, const SubstreamPath & path, ColumnPtr column)
{
if (cache && !path.empty())
cache->emplace(getSubcolumnNameForStream(path), column);
if (!cache || path.empty())
return;
cache->emplace(getSubcolumnNameForStream(path), column);
}
ColumnPtr ISerialization::getFromSubstreamsCache(SubstreamsCache * cache, const SubstreamPath & path)
@ -234,10 +236,7 @@ ColumnPtr ISerialization::getFromSubstreamsCache(SubstreamsCache * cache, const
return nullptr;
auto it = cache->find(getSubcolumnNameForStream(path));
if (it == cache->end())
return nullptr;
return it->second;
return it == cache->end() ? nullptr : it->second;
}
bool ISerialization::isSpecialCompressionAllowed(const SubstreamPath & path)

View File

@ -1,9 +1,9 @@
#include <DataTypes/Serializations/SerializationInfo.h>
#include <DataTypes/NestedUtils.h>
#include <Columns/ColumnSparse.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/VarInt.h>
#include <Core/Block.h>
#include <base/EnumReflection.h>
#include <Poco/JSON/JSON.h>
@ -47,12 +47,25 @@ void SerializationInfo::Data::add(const Data & other)
num_defaults += other.num_defaults;
}
void SerializationInfo::Data::addDefaults(size_t length)
{
num_rows += length;
num_defaults += length;
}
SerializationInfo::SerializationInfo(ISerialization::Kind kind_, const Settings & settings_)
: settings(settings_)
, kind(kind_)
{
}
SerializationInfo::SerializationInfo(ISerialization::Kind kind_, const Settings & settings_, const Data & data_)
: settings(settings_)
, kind(kind_)
, data(data_)
{
}
void SerializationInfo::add(const IColumn & column)
{
data.add(column);
@ -67,6 +80,13 @@ void SerializationInfo::add(const SerializationInfo & other)
kind = chooseKind(data, settings);
}
void SerializationInfo::addDefaults(size_t length)
{
data.addDefaults(length);
if (settings.choose_kind)
kind = chooseKind(data, settings);
}
void SerializationInfo::replaceData(const SerializationInfo & other)
{
data = other.data;
@ -74,9 +94,7 @@ void SerializationInfo::replaceData(const SerializationInfo & other)
MutableSerializationInfoPtr SerializationInfo::clone() const
{
auto res = std::make_shared<SerializationInfo>(kind, settings);
res->data = data;
return res;
return std::make_shared<SerializationInfo>(kind, settings, data);
}
void SerializationInfo::serialializeKindBinary(WriteBuffer & out) const
@ -221,13 +239,8 @@ void SerializationInfoByName::readJSON(ReadBuffer & in)
"Missed field '{}' in SerializationInfo of columns", KEY_NAME);
auto name = elem_object->getValue<String>(KEY_NAME);
auto it = find(name);
if (it == end())
throw Exception(ErrorCodes::CORRUPTED_DATA,
"There is no column {} in serialization infos", name);
it->second->fromJSON(*elem_object);
if (auto it = find(name); it != end())
it->second->fromJSON(*elem_object);
}
}
}

View File

@ -34,6 +34,7 @@ public:
void add(const IColumn & column);
void add(const Data & other);
void addDefaults(size_t length);
};
struct Settings
@ -45,6 +46,7 @@ public:
};
SerializationInfo(ISerialization::Kind kind_, const Settings & settings_);
SerializationInfo(ISerialization::Kind kind_, const Settings & settings_, const Data & data_);
virtual ~SerializationInfo() = default;
@ -52,7 +54,9 @@ public:
virtual void add(const IColumn & column);
virtual void add(const SerializationInfo & other);
virtual void addDefaults(size_t length);
virtual void replaceData(const SerializationInfo & other);
virtual std::shared_ptr<SerializationInfo> clone() const;
virtual void serialializeKindBinary(WriteBuffer & out) const;
@ -61,6 +65,7 @@ public:
virtual Poco::JSON::Object toJSON() const;
virtual void fromJSON(const Poco::JSON::Object & object);
void setKind(ISerialization::Kind kind_) { kind = kind_; }
const Settings & getSettings() const { return settings; }
const Data & getData() const { return data; }
ISerialization::Kind getKind() const { return kind; }

View File

@ -13,10 +13,14 @@ namespace ErrorCodes
}
SerializationInfoTuple::SerializationInfoTuple(
MutableSerializationInfos elems_, const Settings & settings_)
MutableSerializationInfos elems_, Names names_, const Settings & settings_)
: SerializationInfo(ISerialization::Kind::DEFAULT, settings_)
, elems(std::move(elems_))
, names(std::move(names_))
{
assert(names.size() == elems.size());
for (size_t i = 0; i < names.size(); ++i)
name_to_elem[names[i]] = elems[i];
}
bool SerializationInfoTuple::hasCustomSerialization() const
@ -40,22 +44,34 @@ void SerializationInfoTuple::add(const SerializationInfo & other)
{
SerializationInfo::add(other);
const auto & info_tuple = assert_cast<const SerializationInfoTuple &>(other);
assert(elems.size() == info_tuple.elems.size());
const auto & other_info = assert_cast<const SerializationInfoTuple &>(other);
for (const auto & [name, elem] : name_to_elem)
{
auto it = other_info.name_to_elem.find(name);
if (it != other_info.name_to_elem.end())
elem->add(*it->second);
else
elem->addDefaults(other_info.getData().num_rows);
}
}
for (size_t i = 0; i < elems.size(); ++i)
elems[i]->add(*info_tuple.elems[i]);
void SerializationInfoTuple::addDefaults(size_t length)
{
for (const auto & elem : elems)
elem->addDefaults(length);
}
void SerializationInfoTuple::replaceData(const SerializationInfo & other)
{
SerializationInfo::add(other);
const auto & info_tuple = assert_cast<const SerializationInfoTuple &>(other);
assert(elems.size() == info_tuple.elems.size());
for (size_t i = 0; i < elems.size(); ++i)
elems[i]->replaceData(*info_tuple.elems[i]);
const auto & other_info = assert_cast<const SerializationInfoTuple &>(other);
for (const auto & [name, elem] : name_to_elem)
{
auto it = other_info.name_to_elem.find(name);
if (it != other_info.name_to_elem.end())
elem->replaceData(*it->second);
}
}
MutableSerializationInfoPtr SerializationInfoTuple::clone() const
@ -65,7 +81,7 @@ MutableSerializationInfoPtr SerializationInfoTuple::clone() const
for (const auto & elem : elems)
elems_cloned.push_back(elem->clone());
return std::make_shared<SerializationInfoTuple>(std::move(elems_cloned), settings);
return std::make_shared<SerializationInfoTuple>(std::move(elems_cloned), names, settings);
}
void SerializationInfoTuple::serialializeKindBinary(WriteBuffer & out) const
@ -99,7 +115,7 @@ void SerializationInfoTuple::fromJSON(const Poco::JSON::Object & object)
if (!object.has("subcolumns"))
throw Exception(ErrorCodes::CORRUPTED_DATA,
"Missed field '{}' in SerializationInfo of columns SerializationInfoTuple");
"Missed field 'subcolumns' in SerializationInfo of columns SerializationInfoTuple");
auto subcolumns = object.getArray("subcolumns");
if (elems.size() != subcolumns->size())

View File

@ -1,4 +1,5 @@
#pragma once
#include <Core/Names.h>
#include <DataTypes/Serializations/SerializationInfo.h>
namespace DB
@ -7,25 +8,32 @@ namespace DB
class SerializationInfoTuple : public SerializationInfo
{
public:
SerializationInfoTuple(MutableSerializationInfos elems_, const Settings & settings_);
SerializationInfoTuple(MutableSerializationInfos elems_, Names names_, const Settings & settings_);
bool hasCustomSerialization() const override;
void add(const IColumn & column) override;
void add(const SerializationInfo & other) override;
void addDefaults(size_t length) override;
void replaceData(const SerializationInfo & other) override;
MutableSerializationInfoPtr clone() const override;
void serialializeKindBinary(WriteBuffer & out) const override;
void deserializeFromKindsBinary(ReadBuffer & in) override;
Poco::JSON::Object toJSON() const override;
void fromJSON(const Poco::JSON::Object & object) override;
MutableSerializationInfoPtr getElementInfo(size_t i) const { return elems[i]; }
const MutableSerializationInfoPtr & getElementInfo(size_t i) const { return elems[i]; }
ISerialization::Kind getElementKind(size_t i) const { return elems[i]->getKind(); }
private:
MutableSerializationInfos elems;
Names names;
using NameToElem = std::unordered_map<String, MutableSerializationInfoPtr>;
NameToElem name_to_elem;
};
}

View File

@ -8,6 +8,7 @@
#include "IDictionarySource.h"
#include "ExternalQueryBuilder.h"
#include <Core/Block.h>
#include <Interpreters/Context_fwd.h>
#include <Poco/Logger.h>
#include <mutex>

View File

@ -284,6 +284,9 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
else
{
context = Context::createCopy(global_context);
if (created_from_ddl)
context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
}
context->applySettingsChanges(readSettingsFromDictionaryConfig(config, config_prefix));

View File

@ -32,8 +32,7 @@ HTTPDictionarySource::HTTPDictionarySource(
const Configuration & configuration_,
const Poco::Net::HTTPBasicCredentials & credentials_,
Block & sample_block_,
ContextPtr context_,
bool created_from_ddl)
ContextPtr context_)
: log(&Poco::Logger::get("HTTPDictionarySource"))
, update_time(std::chrono::system_clock::from_time_t(0))
, dict_struct(dict_struct_)
@ -42,9 +41,6 @@ HTTPDictionarySource::HTTPDictionarySource(
, context(context_)
, timeouts(ConnectionTimeouts::getHTTPTimeouts(context))
{
if (created_from_ddl)
context->getRemoteHostFilter().checkURL(Poco::URI(configuration.url));
credentials.setUsername(credentials_.getUsername());
credentials.setPassword(credentials_.getPassword());
}
@ -303,7 +299,10 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
return std::make_unique<HTTPDictionarySource>(dict_struct, configuration, credentials, sample_block, context, created_from_ddl);
if (created_from_ddl)
context->getRemoteHostFilter().checkURL(Poco::URI(configuration.url));
return std::make_unique<HTTPDictionarySource>(dict_struct, configuration, credentials, sample_block, context);
};
factory.registerSource("http", create_table_source);
}

View File

@ -37,8 +37,7 @@ public:
const Configuration & configuration,
const Poco::Net::HTTPBasicCredentials & credentials_,
Block & sample_block_,
ContextPtr context_,
bool created_from_ddl);
ContextPtr context_);
HTTPDictionarySource(const HTTPDictionarySource & other);
HTTPDictionarySource & operator=(const HTTPDictionarySource &) = delete;

View File

@ -20,7 +20,7 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
Block & sample_block,
ContextPtr context,
const std::string & /* default_database */,
bool /* created_from_ddl */)
bool created_from_ddl)
{
const auto config_prefix = root_config_prefix + ".mongodb";
ExternalDataSourceConfiguration configuration;
@ -39,6 +39,9 @@ void registerDictionarySourceMongoDB(DictionarySourceFactory & factory)
configuration.database = config.getString(config_prefix + ".db", "");
}
if (created_from_ddl)
context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
return std::make_unique<MongoDBDictionarySource>(dict_struct,
config.getString(config_prefix + ".uri", ""),
configuration.host,

View File

@ -78,6 +78,9 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
: std::nullopt;
if (named_collection)
{
if (created_from_ddl)
global_context->getRemoteHostFilter().checkHostAndPort(configuration.host, toString(configuration.port));
mysql_settings.applyChanges(named_collection->settings_changes);
configuration.set(named_collection->configuration);
configuration.addresses = {std::make_pair(configuration.host, configuration.port)};
@ -90,6 +93,12 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
}
else
{
if (created_from_ddl)
{
for (auto & address : configuration.addresses)
global_context->getRemoteHostFilter().checkHostAndPort(address.first, toString(address.second));
}
configuration.database = config.getString(settings_config_prefix + ".db", "");
configuration.table = config.getString(settings_config_prefix + ".table", "");
pool = std::make_shared<mysqlxx::PoolWithFailover>(mysqlxx::PoolFactory::instance().get(config, settings_config_prefix));

View File

@ -185,13 +185,21 @@ void registerDictionarySourcePostgreSQL(DictionarySourceFactory & factory)
Block & sample_block,
ContextPtr context,
const std::string & /* default_database */,
bool /* created_from_ddl */) -> DictionarySourcePtr
[[maybe_unused]] bool created_from_ddl) -> DictionarySourcePtr
{
#if USE_LIBPQXX
const auto settings_config_prefix = config_prefix + ".postgresql";
auto has_config_key = [](const String & key) { return dictionary_allowed_keys.contains(key) || key.starts_with("replica"); };
auto configuration = getExternalDataSourceConfigurationByPriority(config, settings_config_prefix, context, has_config_key);
const auto & settings = context->getSettingsRef();
if (created_from_ddl)
{
for (const auto & replicas : configuration.replicas_configurations)
for (const auto & replica : replicas.second)
context->getRemoteHostFilter().checkHostAndPort(replica.host, toString(replica.port));
}
auto pool = std::make_shared<postgres::PoolWithFailover>(
configuration.replicas_configurations,
settings.postgresql_connection_pool_size,

View File

@ -99,6 +99,12 @@ public:
void syncRevision(UInt64 revision) override { delegate->syncRevision(revision); }
UInt64 getRevision() const override { return delegate->getRevision(); }
bool supportsStat() const override { return delegate->supportsStat(); }
struct stat stat(const String & path) const override { return delegate->stat(path); }
bool supportsChmod() const override { return delegate->supportsChmod(); }
void chmod(const String & path, mode_t mode) override { delegate->chmod(path, mode); }
protected:
Executor & getExecutor() override;

View File

@ -11,6 +11,8 @@
#include <fstream>
#include <unistd.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <Disks/DiskFactory.h>
#include <Disks/DiskMemory.h>
@ -39,6 +41,7 @@ namespace ErrorCodes
extern const int CANNOT_UNLINK;
extern const int CANNOT_RMDIR;
extern const int BAD_ARGUMENTS;
extern const int CANNOT_STAT;
}
std::mutex DiskLocal::reservation_mutex;
@ -671,6 +674,23 @@ bool DiskLocal::setup()
return true;
}
struct stat DiskLocal::stat(const String & path) const
{
struct stat st;
auto full_path = fs::path(disk_path) / path;
if (::stat(full_path.string().c_str(), &st) == 0)
return st;
DB::throwFromErrnoWithPath("Cannot stat file: " + path, path, DB::ErrorCodes::CANNOT_STAT);
}
void DiskLocal::chmod(const String & path, mode_t mode)
{
auto full_path = fs::path(disk_path) / path;
if (::chmod(full_path.string().c_str(), mode) == 0)
return;
DB::throwFromErrnoWithPath("Cannot chmod file: " + path, path, DB::ErrorCodes::PATH_ACCESS_DENIED);
}
void registerDiskLocal(DiskFactory & factory)
{
auto creator = [](const String & name,

View File

@ -122,6 +122,12 @@ public:
bool canRead() const noexcept;
bool canWrite() const noexcept;
bool supportsStat() const override { return true; }
struct stat stat(const String & path) const override;
bool supportsChmod() const override { return true; }
void chmod(const String & path, mode_t mode) override;
private:
std::optional<UInt64> tryReserve(UInt64 bytes);

View File

@ -6,6 +6,7 @@
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <Disks/IO/ReadBufferFromWebServer.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -172,7 +173,19 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
StoredObjects objects;
objects.emplace_back(remote_path, iter->second.size);
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, objects, getContext(), read_settings);
auto read_buffer_creator =
[this, read_settings]
(const std::string & path_, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase>
{
return std::make_shared<ReadBufferFromWebServer>(
fs::path(url) / path_,
getContext(),
read_settings,
/* use_external_buffer */true,
read_until_position);
};
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{

View File

@ -112,6 +112,11 @@ public:
disk.setLastModified(path, timestamp);
}
void chmod(const String & path, mode_t mode) override
{
disk.chmod(path, mode);
}
void setReadOnly(const std::string & path) override
{
disk.setReadOnly(path);

View File

@ -351,6 +351,12 @@ public:
getType());
}
virtual bool supportsStat() const { return false; }
virtual struct stat stat(const String & /*path*/) const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk does not support stat"); }
virtual bool supportsChmod() const { return false; }
virtual void chmod(const String & /*path*/, mode_t /*mode*/) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk does not support chmod"); }
protected:
friend class DiskDecorator;

View File

@ -103,6 +103,9 @@ public:
/// Set last modified time to file or directory at `path`.
virtual void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) = 0;
/// Just chmod.
virtual void chmod(const String & path, mode_t mode) = 0;
/// Set file at `path` as read-only.
virtual void setReadOnly(const std::string & path) = 0;

View File

@ -2,7 +2,7 @@
#if USE_AZURE_BLOB_STORAGE
#include <IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <IO/ReadBufferFromString.h>
#include <Common/logger_useful.h>
#include <Common/Throttler.h>

View File

@ -1,28 +1,13 @@
#include "ReadBufferFromRemoteFSGather.h"
#include <IO/SeekableReadBuffer.h>
#include <Disks/IO/ReadBufferFromWebServer.h>
#if USE_AWS_S3
#include <IO/ReadBufferFromS3.h>
#endif
#if USE_AZURE_BLOB_STORAGE
#include <IO/ReadBufferFromAzureBlobStorage.h>
#endif
#if USE_HDFS
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#endif
#include <Disks/IO/CachedReadBufferFromRemoteFS.h>
#include <Common/logger_useful.h>
#include <filesystem>
#include <iostream>
#include <Common/hex.h>
#include <Interpreters/FilesystemCacheLog.h>
namespace fs = std::filesystem;
#include <Interpreters/Context.h>
namespace DB
@ -33,97 +18,12 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const String & path, size_t file_size)
{
if (!current_file_path.empty() && !with_cache && enable_cache_log)
{
appendFilesystemCacheLog();
}
current_file_path = path;
current_file_size = file_size;
total_bytes_read_from_current_file = 0;
return createImplementationBufferImpl(path, file_size);
}
#if USE_AWS_S3
SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBufferImpl(const String & path, size_t file_size)
{
auto remote_file_reader_creator = [=, this]()
{
return std::make_unique<ReadBufferFromS3>(
client_ptr,
bucket,
path,
version_id,
max_single_read_retries,
settings,
/* use_external_buffer */true,
/* offset */0,
read_until_position,
/* restricted_seek */true);
};
if (with_cache)
{
return std::make_shared<CachedReadBufferFromRemoteFS>(
path,
settings.remote_fs_cache,
remote_file_reader_creator,
settings,
query_id,
read_until_position ? read_until_position : file_size);
}
return remote_file_reader_creator();
}
#endif
#if USE_AZURE_BLOB_STORAGE
SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementationBufferImpl(const String & path, size_t /* file_size */)
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(
blob_container_client,
path,
settings,
max_single_read_retries,
max_single_download_retries,
/* use_external_buffer */true,
read_until_position);
}
#endif
SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBufferImpl(const String & path, size_t /* file_size */)
{
return std::make_unique<ReadBufferFromWebServer>(
fs::path(uri) / path,
context,
settings,
/* use_external_buffer */true,
read_until_position);
}
#if USE_HDFS
SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBufferImpl(const String & path, size_t /* file_size */)
{
size_t begin_of_path = path.find('/', path.find("//") + 2);
auto hdfs_path = path.substr(begin_of_path);
auto hdfs_uri = path.substr(0, begin_of_path);
LOG_TEST(log, "HDFS uri: {}, path: {}", hdfs_path, hdfs_uri);
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, hdfs_path, config, settings);
}
#endif
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_)
: ReadBuffer(nullptr, 0)
, read_buffer_creator(std::move(read_buffer_creator_))
, blobs_to_read(blobs_to_read_)
, settings(settings_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
@ -138,6 +38,33 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
&& (!IFileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
}
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const String & path, size_t file_size)
{
if (!current_file_path.empty() && !with_cache && enable_cache_log)
{
appendFilesystemCacheLog();
}
current_file_path = path;
current_file_size = file_size;
total_bytes_read_from_current_file = 0;
size_t current_read_until_position = read_until_position ? read_until_position : file_size;
auto current_read_buffer_creator = [path, current_read_until_position, this]() { return read_buffer_creator(path, current_read_until_position); };
if (with_cache)
{
return std::make_shared<CachedReadBufferFromRemoteFS>(
path,
settings.remote_fs_cache,
current_read_buffer_creator,
settings,
query_id,
current_read_until_position);
}
return current_read_buffer_creator();
}
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
{
@ -156,7 +83,6 @@ void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
cache_log->add(elem);
}
IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore)
{
/**
@ -178,7 +104,6 @@ IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data,
return {0, 0};
}
void ReadBufferFromRemoteFSGather::initialize()
{
/// One clickhouse file can be split into multiple files in remote fs.
@ -206,7 +131,6 @@ void ReadBufferFromRemoteFSGather::initialize()
current_buf = nullptr;
}
bool ReadBufferFromRemoteFSGather::nextImpl()
{
/// Find first available buffer that fits to given offset.
@ -232,7 +156,6 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
return readImpl();
}
bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
{
/// If there is no available buffers - nothing to read.
@ -247,7 +170,6 @@ bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
return true;
}
bool ReadBufferFromRemoteFSGather::readImpl()
{
swap(*current_buf);
@ -294,13 +216,11 @@ bool ReadBufferFromRemoteFSGather::readImpl()
return result;
}
size_t ReadBufferFromRemoteFSGather::getFileOffsetOfBufferEnd() const
{
return file_offset_of_buffer_end;
}
void ReadBufferFromRemoteFSGather::setReadUntilPosition(size_t position)
{
if (position != read_until_position)
@ -310,7 +230,6 @@ void ReadBufferFromRemoteFSGather::setReadUntilPosition(size_t position)
}
}
void ReadBufferFromRemoteFSGather::reset()
{
current_buf.reset();
@ -321,7 +240,6 @@ String ReadBufferFromRemoteFSGather::getFileName() const
return current_file_path;
}
size_t ReadBufferFromRemoteFSGather::getFileSize() const
{
size_t size = 0;

View File

@ -6,12 +6,6 @@
#include <IO/AsynchronousReader.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#if USE_AZURE_BLOB_STORAGE
#include <azure/storage/blobs.hpp>
#endif
namespace Aws { namespace S3 { class S3Client; } }
namespace Poco { class Logger; }
namespace DB
@ -21,12 +15,15 @@ namespace DB
* Remote disk might need to split one clickhouse file into multiple files in remote fs.
* This class works like a proxy to allow transition from one file into multiple.
*/
class ReadBufferFromRemoteFSGather : public ReadBuffer
class ReadBufferFromRemoteFSGather final : public ReadBuffer
{
friend class ReadIndirectBufferFromRemoteFS;
public:
using ReadBufferCreator = std::function<std::shared_ptr<ReadBufferFromFileBase>(const std::string & path, size_t read_until_position)>;
ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_);
@ -50,8 +47,20 @@ public:
size_t getImplementationBufferOffset() const;
protected:
virtual SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) = 0;
private:
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size);
bool nextImpl() override;
void initialize();
bool readImpl();
bool moveToNextBuffer();
void appendFilesystemCacheLog();
ReadBufferCreator read_buffer_creator;
StoredObjects blobs_to_read;
@ -68,19 +77,6 @@ protected:
Poco::Logger * log;
private:
SeekableReadBufferPtr createImplementationBuffer(const String & path, size_t file_size);
bool nextImpl() override;
void initialize();
bool readImpl();
bool moveToNextBuffer();
void appendFilesystemCacheLog();
SeekableReadBufferPtr current_buf;
size_t current_buf_idx = 0;
@ -99,108 +95,4 @@ private:
bool enable_cache_log = false;
};
#if USE_AWS_S3
/// Reads data from S3 using stored paths in metadata.
class ReadBufferFromS3Gather final : public ReadBufferFromRemoteFSGather
{
public:
ReadBufferFromS3Gather(
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & version_id_,
const StoredObjects & blobs_to_read_,
size_t max_single_read_retries_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
, client_ptr(std::move(client_ptr_))
, bucket(bucket_)
, version_id(version_id_)
, max_single_read_retries(max_single_read_retries_)
{
}
SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) override;
private:
std::shared_ptr<const Aws::S3::S3Client> client_ptr;
String bucket;
String version_id;
UInt64 max_single_read_retries;
};
#endif
#if USE_AZURE_BLOB_STORAGE
/// Reads data from AzureBlob Storage using paths stored in metadata.
class ReadBufferFromAzureBlobStorageGather final : public ReadBufferFromRemoteFSGather
{
public:
ReadBufferFromAzureBlobStorageGather(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const StoredObjects & blobs_to_read_,
size_t max_single_read_retries_,
size_t max_single_download_retries_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
, blob_container_client(blob_container_client_)
, max_single_read_retries(max_single_read_retries_)
, max_single_download_retries(max_single_download_retries_)
{
}
SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) override;
private:
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client;
size_t max_single_read_retries;
size_t max_single_download_retries;
};
#endif
class ReadBufferFromWebServerGather final : public ReadBufferFromRemoteFSGather
{
public:
ReadBufferFromWebServerGather(
const String & uri_,
const StoredObjects & blobs_to_read_,
ContextPtr context_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
, uri(uri_)
, context(context_)
{
}
SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) override;
private:
String uri;
ContextPtr context;
};
#if USE_HDFS
/// Reads data from HDFS using stored paths in metadata.
class ReadBufferFromHDFSGather final : public ReadBufferFromRemoteFSGather
{
public:
ReadBufferFromHDFSGather(
const Poco::Util::AbstractConfiguration & config_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
, config(config_)
{
}
SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) override;
private:
const Poco::Util::AbstractConfiguration & config;
};
#endif
}

View File

@ -27,7 +27,7 @@ ReadBufferFromWebServer::ReadBufferFromWebServer(
const ReadSettings & settings_,
bool use_external_buffer_,
size_t read_until_position_)
: SeekableReadBuffer(nullptr, 0)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, log(&Poco::Logger::get("ReadBufferFromWebServer"))
, context(context_)
, url(url_)

View File

@ -1,6 +1,6 @@
#pragma once
#include <IO/SeekableReadBuffer.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/ReadSettings.h>
#include <Interpreters/Context.h>
@ -15,7 +15,7 @@ namespace DB
*
* Usage: ReadIndirectBufferFromRemoteFS -> SeekAvoidingReadBuffer -> ReadBufferFromWebServer -> ReadWriteBufferFromHTTP.
*/
class ReadBufferFromWebServer : public SeekableReadBuffer
class ReadBufferFromWebServer : public ReadBufferFromFileBase
{
public:
explicit ReadBufferFromWebServer(
@ -33,6 +33,8 @@ public:
size_t getFileOffsetOfBufferEnd() const override { return offset; }
String getFileName() const override { return url; }
private:
std::unique_ptr<ReadBuffer> initialize();

View File

@ -2,7 +2,7 @@
#if USE_AZURE_BLOB_STORAGE
#include <IO/WriteBufferFromAzureBlobStorage.h>
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
#include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
#include <Common/Throttler.h>

View File

@ -1,11 +1,5 @@
#include "WriteIndirectBufferFromRemoteFS.h"
#include <IO/WriteBufferFromS3.h>
#include <IO/WriteBufferFromAzureBlobStorage.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/WriteBufferFromHTTP.h>
namespace DB
{

View File

@ -3,8 +3,8 @@
#if USE_AZURE_BLOB_STORAGE
#include <Common/getRandomASCIIString.h>
#include <IO/ReadBufferFromAzureBlobStorage.h>
#include <IO/WriteBufferFromAzureBlobStorage.h>
#include <Disks/IO/ReadBufferFromAzureBlobStorage.h>
#include <Disks/IO/WriteBufferFromAzureBlobStorage.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
@ -79,11 +79,24 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto settings_ptr = settings.get();
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
client.get(),
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(const std::string & path, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(),
path,
disk_read_settings,
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
/* use_external_buffer */true,
read_until_position);
};
auto reader_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
disk_read_settings);
if (disk_read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)

View File

@ -12,6 +12,9 @@
#include <Common/getRandomASCIIString.h>
#include <Common/MultiVersion.h>
#if USE_AZURE_BLOB_STORAGE
#include <azure/storage/blobs.hpp>
#endif
namespace DB
{

View File

@ -367,6 +367,18 @@ time_t DiskObjectStorage::getLastChanged(const String & path) const
return metadata_storage->getLastChanged(path);
}
struct stat DiskObjectStorage::stat(const String & path) const
{
return metadata_storage->stat(path);
}
void DiskObjectStorage::chmod(const String & path, mode_t mode)
{
auto transaction = createObjectStorageTransaction();
transaction->chmod(path, mode);
transaction->commit();
}
void DiskObjectStorage::shutdown()
{
LOG_INFO(log, "Shutting down disk {}", name);

View File

@ -168,6 +168,12 @@ public:
bool supportsCache() const override;
bool supportsStat() const override { return metadata_storage->supportsStat(); }
struct stat stat(const String & path) const override;
bool supportsChmod() const override { return metadata_storage->supportsChmod(); }
void chmod(const String & path, mode_t mode) override;
private:
/// Create actual disk object storage transaction for operations

View File

@ -613,6 +613,15 @@ void DiskObjectStorageTransaction::setLastModified(const std::string & path, con
}));
}
void DiskObjectStorageTransaction::chmod(const String & path, mode_t mode)
{
operations_to_execute.emplace_back(
std::make_unique<PureMetadataObjectStorageOperation>(object_storage, metadata_storage, [path, mode](MetadataTransactionPtr tx)
{
tx->chmod(path, mode);
}));
}
void DiskObjectStorageTransaction::createFile(const std::string & path)
{
operations_to_execute.emplace_back(

View File

@ -109,6 +109,7 @@ public:
void removeSharedFiles(const RemoveBatchRequest & files, bool keep_all_batch_data, const NameSet & file_names_remove_metadata_only) override;
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
void chmod(const String & path, mode_t mode) override;
void setReadOnly(const std::string & path) override;
void createHardLink(const std::string & src_path, const std::string & dst_path) override;
};

View File

@ -42,6 +42,12 @@ public:
time_t getLastChanged(const std::string & path) const override;
bool supportsChmod() const override { return disk->supportsChmod(); }
bool supportsStat() const override { return disk->supportsStat(); }
struct stat stat(const String & path) const override { return disk->stat(path); }
std::vector<std::string> listDirectory(const std::string & path) const override;
DirectoryIteratorPtr iterateDirectory(const std::string & path) const override;
@ -89,6 +95,10 @@ public:
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
bool supportsChmod() const override { return disk->supportsChmod(); }
void chmod(const String & path, mode_t mode) override { disk->chmod(path, mode); }
void setReadOnly(const std::string & path) override;
void unlinkFile(const std::string & path) override;

View File

@ -61,7 +61,19 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
std::optional<size_t>,
std::optional<size_t>) const
{
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(config, objects, patchSettings(read_settings));
auto disk_read_settings = patchSettings(read_settings);
auto read_buffer_creator =
[this, disk_read_settings]
(const std::string & path, size_t /* read_until_position */) -> std::shared_ptr<ReadBufferFromFileBase>
{
size_t begin_of_path = path.find('/', path.find("//") + 2);
auto hdfs_path = path.substr(begin_of_path);
auto hdfs_uri = path.substr(0, begin_of_path);
return std::make_unique<ReadBufferFromHDFS>(hdfs_uri, hdfs_path, config, disk_read_settings);
};
auto hdfs_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, disk_read_settings);
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl));
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
}

View File

@ -37,6 +37,9 @@ public:
virtual void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) = 0;
virtual bool supportsChmod() const = 0;
virtual void chmod(const String & path, mode_t mode) = 0;
virtual void setReadOnly(const std::string & path) = 0;
virtual void unlinkFile(const std::string & path) = 0;
@ -107,6 +110,11 @@ public:
virtual time_t getLastChanged(const std::string & path) const = 0;
virtual bool supportsChmod() const = 0;
virtual bool supportsStat() const = 0;
virtual struct stat stat(const String & path) const = 0;
virtual std::vector<std::string> listDirectory(const std::string & path) const = 0;
virtual DirectoryIteratorPtr iterateDirectory(const std::string & path) const = 0;

View File

@ -250,6 +250,11 @@ void MetadataStorageFromDiskTransaction::setLastModified(const std::string & pat
addOperation(std::make_unique<SetLastModifiedOperation>(path, timestamp, *metadata_storage.getDisk()));
}
void MetadataStorageFromDiskTransaction::chmod(const String & path, mode_t mode)
{
addOperation(std::make_unique<ChmodOperation>(path, mode, *metadata_storage.getDisk()));
}
void MetadataStorageFromDiskTransaction::unlinkFile(const std::string & path)
{
addOperation(std::make_unique<UnlinkFileOperation>(path, *metadata_storage.getDisk()));

View File

@ -39,6 +39,12 @@ public:
time_t getLastChanged(const std::string & path) const override;
bool supportsChmod() const override { return disk->supportsChmod(); }
bool supportsStat() const override { return disk->supportsStat(); }
struct stat stat(const String & path) const override { return disk->stat(path); }
std::vector<std::string> listDirectory(const std::string & path) const override;
DirectoryIteratorPtr iterateDirectory(const std::string & path) const override;
@ -94,6 +100,10 @@ public:
void setLastModified(const std::string & path, const Poco::Timestamp & timestamp) override;
bool supportsChmod() const override { return metadata_storage.supportsChmod(); }
void chmod(const String & path, mode_t mode) override;
void setReadOnly(const std::string & path) override;
void unlinkFile(const std::string & path) override;

View File

@ -36,6 +36,24 @@ void SetLastModifiedOperation::undo()
disk.setLastModified(path, old_timestamp);
}
ChmodOperation::ChmodOperation(const std::string & path_, mode_t mode_, IDisk & disk_)
: path(path_)
, mode(mode_)
, disk(disk_)
{
}
void ChmodOperation::execute(std::unique_lock<std::shared_mutex> &)
{
old_mode = disk.stat(path).st_mode;
disk.chmod(path, mode);
}
void ChmodOperation::undo()
{
disk.chmod(path, old_mode);
}
UnlinkFileOperation::UnlinkFileOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)

View File

@ -37,6 +37,21 @@ private:
IDisk & disk;
};
struct ChmodOperation final : public IMetadataOperation
{
ChmodOperation(const std::string & path_, mode_t mode_, IDisk & disk_);
void execute(std::unique_lock<std::shared_mutex> & metadata_lock) override;
void undo() override;
private:
std::string path;
mode_t mode;
mode_t old_mode;
IDisk & disk;
};
struct UnlinkFileOperation final : public IMetadataOperation
{

View File

@ -144,12 +144,26 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
auto settings_ptr = s3_settings.get();
auto s3_impl = std::make_unique<ReadBufferFromS3Gather>(
client.get(),
bucket,
version_id,
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
(const std::string & path, size_t read_until_position) -> std::shared_ptr<ReadBufferFromFileBase>
{
return std::make_shared<ReadBufferFromS3>(
client.get(),
bucket,
path,
version_id,
settings_ptr->s3_settings.max_single_read_retries,
disk_read_settings,
/* use_external_buffer */true,
/* offset */0,
read_until_position,
/* restricted_seek */true);
};
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
settings_ptr->s3_settings.max_single_read_retries,
disk_read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)

View File

@ -103,7 +103,7 @@ void NativeWriter::write(const Block & block)
mark.offset_in_decompressed_block = ostr_concrete->getRemainingBytes();
}
ColumnWithTypeAndName column = block.safeGetByPosition(i);
auto column = block.safeGetByPosition(i);
/// Send data to old clients without low cardinality type.
if (remove_low_cardinality || (client_revision && client_revision < DBMS_MIN_REVISION_WITH_LOW_CARDINALITY_TYPE))
@ -145,7 +145,7 @@ void NativeWriter::write(const Block & block)
SerializationPtr serialization;
if (client_revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION)
{
auto info = column.column->getSerializationInfo();
auto info = column.type->getSerializationInfo(*column.column);
serialization = column.type->getSerialization(*info);
bool has_custom = info->hasCustomSerialization();

View File

@ -141,6 +141,8 @@ public:
void getLambdaArgumentTypesImpl(DataTypes & arguments) const override { function->getLambdaArgumentTypes(arguments); }
const IFunction * getFunction() const { return function.get(); }
private:
std::shared_ptr<IFunction> function;
};

View File

@ -1,4 +1,4 @@
#include <Functions/IFunction.h>
#include <Functions/indexHint.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypesNumber.h>
@ -6,60 +6,6 @@
namespace DB
{
/** The `indexHint` function takes any number of any arguments and always returns one.
*
* This function has a special meaning (see ExpressionAnalyzer, KeyCondition)
* - the expressions inside it are not evaluated;
* - but when analyzing the index (selecting ranges for reading), this function is treated the same way,
* as if instead of using it the expression itself would be.
*
* Example: WHERE something AND indexHint(CounterID = 34)
* - do not read or calculate CounterID = 34, but select ranges in which the CounterID = 34 expression can be true.
*
* The function can be used for debugging purposes, as well as for (hidden from the user) query conversions.
*/
class FunctionIndexHint : public IFunction
{
public:
static constexpr auto name = "indexHint";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionIndexHint>();
}
bool isVariadic() const override
{
return true;
}
size_t getNumberOfArguments() const override
{
return 0;
}
bool useDefaultImplementationForNulls() const override { return false; }
bool isSuitableForConstantFolding() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
String getName() const override
{
return name;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeUInt8>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeUInt8().createColumnConst(input_rows_count, 1u);
}
};
REGISTER_FUNCTION(IndexHint)
{
factory.registerFunction<FunctionIndexHint>();

70
src/Functions/indexHint.h Normal file
View File

@ -0,0 +1,70 @@
#pragma once
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
class ActionsDAG;
using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
/** The `indexHint` function takes any number of any arguments and always returns one.
*
* This function has a special meaning (see ExpressionAnalyzer, KeyCondition)
* - the expressions inside it are not evaluated;
* - but when analyzing the index (selecting ranges for reading), this function is treated the same way,
* as if instead of using it the expression itself would be.
*
* Example: WHERE something AND indexHint(CounterID = 34)
* - do not read or calculate CounterID = 34, but select ranges in which the CounterID = 34 expression can be true.
*
* The function can be used for debugging purposes, as well as for (hidden from the user) query conversions.
*/
class FunctionIndexHint : public IFunction
{
public:
static constexpr auto name = "indexHint";
static FunctionPtr create(ContextPtr)
{
return std::make_shared<FunctionIndexHint>();
}
bool isVariadic() const override
{
return true;
}
size_t getNumberOfArguments() const override
{
return 0;
}
bool useDefaultImplementationForNulls() const override { return false; }
bool isSuitableForConstantFolding() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
String getName() const override
{
return name;
}
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeUInt8>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
return DataTypeUInt8().createColumnConst(input_rows_count, 1u);
}
void setActions(ActionsDAGPtr actions_) { actions = std::move(actions_); }
const ActionsDAGPtr & getActions() const { return actions; }
private:
ActionsDAGPtr actions;
};
}

View File

@ -144,6 +144,9 @@ const ActionsDAG::Node & ActionsDAG::addArrayJoin(const Node & child, std::strin
if (!array_type)
throw Exception("ARRAY JOIN requires array argument", ErrorCodes::TYPE_MISMATCH);
if (result_name.empty())
result_name = "arrayJoin(" + child.result_name + ")";
Node node;
node.type = ActionType::ARRAY_JOIN;
node.result_type = array_type->getNestedType();

View File

@ -313,4 +313,10 @@ private:
static ActionsDAGPtr cloneActionsForConjunction(NodeRawConstPtrs conjunction, const ColumnsWithTypeAndName & all_inputs);
};
/// This is an ugly way to bypass impossibility to forward declare ActionDAG::Node.
struct ActionDAGNodes
{
ActionsDAG::NodeRawConstPtrs nodes;
};
}

View File

@ -9,6 +9,7 @@
#include <Functions/grouping.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsMiscellaneous.h>
#include <Functions/indexHint.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
@ -934,8 +935,44 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
/// A special function `indexHint`. Everything that is inside it is not calculated
if (node.name == "indexHint")
{
if (data.only_consts)
return;
/// Here we create a separate DAG for indexHint condition.
/// It will be used only for index analysis.
Data index_hint_data(
data.getContext(),
data.set_size_limit,
data.subquery_depth,
data.source_columns,
std::make_shared<ActionsDAG>(data.source_columns),
data.prepared_sets,
data.subqueries_for_sets,
data.no_subqueries,
data.no_makeset,
data.only_consts,
/*create_source_for_in*/ false,
data.aggregation_keys_info);
NamesWithAliases args;
if (node.arguments)
{
for (const auto & arg : node.arguments->children)
{
visit(arg, index_hint_data);
args.push_back({arg->getColumnNameWithoutAlias(), {}});
}
}
auto dag = index_hint_data.getActions();
dag->project(args);
auto index_hint = std::make_shared<FunctionIndexHint>();
index_hint->setActions(std::move(dag));
// Arguments are removed. We add function instead of constant column to avoid constant folding.
data.addFunction(FunctionFactory::instance().get("indexHint", data.getContext()), {}, column_name);
data.addFunction(std::make_unique<FunctionToOverloadResolverAdaptor>(index_hint), {}, column_name);
return;
}

View File

@ -5,6 +5,7 @@
#include <Databases/IDatabase.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseOnDisk.h>
#include <Disks/IDisk.h>
#include <Common/quoteString.h>
#include <Storages/StorageMemory.h>
#include <Storages/LiveView/TemporaryLiveViewCleaner.h>
@ -19,10 +20,6 @@
#include <Common/filesystemHelpers.h>
#include <Common/noexcept_scope.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <utime.h>
#include "config_core.h"
#if USE_MYSQL
@ -894,7 +891,7 @@ void DatabaseCatalog::enqueueDroppedTableCleanup(StorageID table_id, StoragePtr
create->setTable(table_id.table_name);
try
{
table = createTableFromAST(*create, table_id.getDatabaseName(), data_path, getContext(), false).second;
table = createTableFromAST(*create, table_id.getDatabaseName(), data_path, getContext(), /* force_restore */ true).second;
table->is_dropped = true;
}
catch (...)
@ -979,7 +976,6 @@ void DatabaseCatalog::dropTableDataTask()
if (table.table_id)
{
try
{
dropTableFinally(table);
@ -1019,13 +1015,15 @@ void DatabaseCatalog::dropTableFinally(const TableMarkedAsDropped & table)
table.table->drop();
}
/// Even if table is not loaded, try remove its data from disk.
/// TODO remove data from all volumes
fs::path data_path = fs::path(getContext()->getPath()) / "store" / getPathForUUID(table.table_id.uuid);
if (fs::exists(data_path))
/// Even if table is not loaded, try remove its data from disks.
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
LOG_INFO(log, "Removing data directory {} of dropped table {}", data_path.string(), table.table_id.getNameForLogs());
fs::remove_all(data_path);
String data_path = "store/" + getPathForUUID(table.table_id.uuid);
if (!disk->exists(data_path))
continue;
LOG_INFO(log, "Removing data directory {} of dropped table {} from disk {}", data_path, table.table_id.getNameForLogs(), disk_name);
disk->removeRecursive(data_path);
}
LOG_INFO(log, "Removing metadata {} of dropped table {}", table.metadata_path, table.table_id.getNameForLogs());
@ -1169,121 +1167,118 @@ void DatabaseCatalog::updateLoadingDependencies(const StorageID & table_id, Tabl
void DatabaseCatalog::cleanupStoreDirectoryTask()
{
fs::path store_path = fs::path(getContext()->getPath()) / "store";
size_t affected_dirs = 0;
for (const auto & prefix_dir : fs::directory_iterator{store_path})
for (const auto & [disk_name, disk] : getContext()->getDisksMap())
{
String prefix = prefix_dir.path().filename();
bool expected_prefix_dir = prefix_dir.is_directory() &&
prefix.size() == 3 &&
isHexDigit(prefix[0]) &&
isHexDigit(prefix[1]) &&
isHexDigit(prefix[2]);
if (!expected_prefix_dir)
{
LOG_WARNING(log, "Found invalid directory {}, will try to remove it", prefix_dir.path().string());
affected_dirs += maybeRemoveDirectory(prefix_dir.path());
if (!disk->supportsStat() || !disk->supportsChmod())
continue;
}
for (const auto & uuid_dir : fs::directory_iterator{prefix_dir.path()})
size_t affected_dirs = 0;
for (auto it = disk->iterateDirectory("store"); it->isValid(); it->next())
{
String uuid_str = uuid_dir.path().filename();
UUID uuid;
bool parsed = tryParse(uuid, uuid_str);
String prefix = it->name();
bool expected_prefix_dir = disk->isDirectory(it->path()) && prefix.size() == 3 && isHexDigit(prefix[0]) && isHexDigit(prefix[1])
&& isHexDigit(prefix[2]);
bool expected_dir = uuid_dir.is_directory() &&
parsed &&
uuid != UUIDHelpers::Nil &&
uuid_str.starts_with(prefix);
if (!expected_dir)
if (!expected_prefix_dir)
{
LOG_WARNING(log, "Found invalid directory {}, will try to remove it", uuid_dir.path().string());
affected_dirs += maybeRemoveDirectory(uuid_dir.path());
LOG_WARNING(log, "Found invalid directory {} on disk {}, will try to remove it", it->path(), disk_name);
affected_dirs += maybeRemoveDirectory(disk_name, disk, it->path());
continue;
}
/// Order is important
if (!hasUUIDMapping(uuid))
for (auto jt = disk->iterateDirectory(it->path()); jt->isValid(); jt->next())
{
/// We load uuids even for detached and permanently detached tables,
/// so it looks safe enough to remove directory if we don't have uuid mapping for it.
/// No table or database using this directory should concurrently appear,
/// because creation of new table would fail with "directory already exists".
affected_dirs += maybeRemoveDirectory(uuid_dir.path());
String uuid_str = jt->name();
UUID uuid;
bool parsed = tryParse(uuid, uuid_str);
bool expected_dir = disk->isDirectory(jt->path()) && parsed && uuid != UUIDHelpers::Nil && uuid_str.starts_with(prefix);
if (!expected_dir)
{
LOG_WARNING(log, "Found invalid directory {} on disk {}, will try to remove it", jt->path(), disk_name);
affected_dirs += maybeRemoveDirectory(disk_name, disk, jt->path());
continue;
}
/// Order is important
if (!hasUUIDMapping(uuid))
{
/// We load uuids even for detached and permanently detached tables,
/// so it looks safe enough to remove directory if we don't have uuid mapping for it.
/// No table or database using this directory should concurrently appear,
/// because creation of new table would fail with "directory already exists".
affected_dirs += maybeRemoveDirectory(disk_name, disk, jt->path());
}
}
}
}
if (affected_dirs)
LOG_INFO(log, "Cleaned up {} directories from store/", affected_dirs);
if (affected_dirs)
LOG_INFO(log, "Cleaned up {} directories from store/ on disk {}", affected_dirs, disk_name);
}
(*cleanup_task)->scheduleAfter(unused_dir_cleanup_period_sec * 1000);
}
bool DatabaseCatalog::maybeRemoveDirectory(const fs::path & unused_dir)
bool DatabaseCatalog::maybeRemoveDirectory(const String & disk_name, const DiskPtr & disk, const String & unused_dir)
{
/// "Safe" automatic removal of some directory.
/// At first we do not remove anything and only revoke all access right.
/// And remove only if nobody noticed it after, for example, one month.
struct stat st;
if (stat(unused_dir.string().c_str(), &st))
try
{
LOG_ERROR(log, "Failed to stat {}, errno: {}", unused_dir.string(), errno);
struct stat st = disk->stat(unused_dir);
if (st.st_uid != geteuid())
{
/// Directory is not owned by clickhouse, it's weird, let's ignore it (chmod will likely fail anyway).
LOG_WARNING(log, "Found directory {} with unexpected owner (uid={}) on disk {}", unused_dir, st.st_uid, disk_name);
return false;
}
time_t max_modification_time = std::max(st.st_atime, std::max(st.st_mtime, st.st_ctime));
time_t current_time = time(nullptr);
if (st.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO))
{
if (current_time <= max_modification_time + unused_dir_hide_timeout_sec)
return false;
LOG_INFO(log, "Removing access rights for unused directory {} from disk {} (will remove it when timeout exceed)", unused_dir, disk_name);
/// Explicitly update modification time just in case
disk->setLastModified(unused_dir, Poco::Timestamp::fromEpochTime(current_time));
/// Remove all access right
disk->chmod(unused_dir, 0);
return true;
}
else
{
if (!unused_dir_rm_timeout_sec)
return false;
if (current_time <= max_modification_time + unused_dir_rm_timeout_sec)
return false;
LOG_INFO(log, "Removing unused directory {} from disk {}", unused_dir, disk_name);
/// We have to set these access rights to make recursive removal work
disk->chmod(unused_dir, S_IRWXU);
disk->removeRecursive(unused_dir);
return true;
}
}
catch (...)
{
tryLogCurrentException(log, fmt::format("Failed to remove unused directory {} from disk {} ({})",
unused_dir, disk->getName(), disk->getPath()));
return false;
}
if (st.st_uid != geteuid())
{
/// Directory is not owned by clickhouse, it's weird, let's ignore it (chmod will likely fail anyway).
LOG_WARNING(log, "Found directory {} with unexpected owner (uid={})", unused_dir.string(), st.st_uid);
return false;
}
time_t max_modification_time = std::max(st.st_atime, std::max(st.st_mtime, st.st_ctime));
time_t current_time = time(nullptr);
if (st.st_mode & (S_IRWXU | S_IRWXG | S_IRWXO))
{
if (current_time <= max_modification_time + unused_dir_hide_timeout_sec)
return false;
LOG_INFO(log, "Removing access rights for unused directory {} (will remove it when timeout exceed)", unused_dir.string());
/// Explicitly update modification time just in case
struct utimbuf tb;
tb.actime = current_time;
tb.modtime = current_time;
if (utime(unused_dir.string().c_str(), &tb) != 0)
LOG_ERROR(log, "Failed to utime {}, errno: {}", unused_dir.string(), errno);
/// Remove all access right
if (chmod(unused_dir.string().c_str(), 0))
LOG_ERROR(log, "Failed to chmod {}, errno: {}", unused_dir.string(), errno);
return true;
}
else
{
if (!unused_dir_rm_timeout_sec)
return false;
if (current_time <= max_modification_time + unused_dir_rm_timeout_sec)
return false;
LOG_INFO(log, "Removing unused directory {}", unused_dir.string());
/// We have to set these access rights to make recursive removal work
if (chmod(unused_dir.string().c_str(), S_IRWXU))
LOG_ERROR(log, "Failed to chmod {}, errno: {}", unused_dir.string(), errno);
fs::remove_all(unused_dir);
return true;
}
}
static void maybeUnlockUUID(UUID uuid)

View File

@ -31,10 +31,12 @@ class IDatabase;
class Exception;
class ColumnsDescription;
struct ConstraintsDescription;
class IDisk;
using DatabasePtr = std::shared_ptr<IDatabase>;
using DatabaseAndTable = std::pair<DatabasePtr, StoragePtr>;
using Databases = std::map<String, std::shared_ptr<IDatabase>>;
using DiskPtr = std::shared_ptr<IDisk>;
/// Table -> set of table-views that make SELECT from it.
using ViewDependencies = std::map<StorageID, std::set<StorageID>>;
@ -271,7 +273,7 @@ private:
void dropTableFinally(const TableMarkedAsDropped & table);
void cleanupStoreDirectoryTask();
bool maybeRemoveDirectory(const fs::path & unused_dir);
bool maybeRemoveDirectory(const String & disk_name, const DiskPtr & disk, const String & unused_dir);
static constexpr size_t reschedule_time_ms = 100;
static constexpr time_t drop_error_cooldown_sec = 5;

View File

@ -36,6 +36,7 @@ struct RequiredSourceColumnsData
bool has_table_join = false;
bool has_array_join = false;
bool visit_index_hint = false;
bool addColumnAliasIfAny(const IAST & ast);
void addColumnIdentifier(const ASTIdentifier & node);

View File

@ -52,10 +52,8 @@ bool RequiredSourceColumnsMatcher::needChildVisit(const ASTPtr & node, const AST
if (const auto * f = node->as<ASTFunction>())
{
/// "indexHint" is a special function for index analysis.
/// Everything that is inside it is not calculated. See KeyCondition
/// "lambda" visit children itself.
if (f->name == "indexHint" || f->name == "lambda")
if (f->name == "lambda")
return false;
}
@ -73,6 +71,11 @@ void RequiredSourceColumnsMatcher::visit(const ASTPtr & ast, Data & data)
}
if (auto * t = ast->as<ASTFunction>())
{
/// "indexHint" is a special function for index analysis.
/// Everything that is inside it is not calculated. See KeyCondition
if (!data.visit_index_hint && t->name == "indexHint")
return;
data.addColumnAliasIfAny(*ast);
visit(*t, ast, data);
return;

View File

@ -965,12 +965,13 @@ void TreeRewriterResult::collectSourceColumns(bool add_special)
/// Calculate which columns are required to execute the expression.
/// Then, delete all other columns from the list of available columns.
/// After execution, columns will only contain the list of columns needed to read from the table.
void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select)
void TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select, bool visit_index_hint)
{
/// We calculate required_source_columns with source_columns modifications and swap them on exit
required_source_columns = source_columns;
RequiredSourceColumnsVisitor::Data columns_context;
columns_context.visit_index_hint = visit_index_hint;
RequiredSourceColumnsVisitor(columns_context).visit(query);
NameSet source_column_names;
@ -1307,7 +1308,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
result.aggregates = getAggregates(query, *select_query);
result.window_function_asts = getWindowFunctions(query, *select_query);
result.expressions_with_window_function = getExpressionsWithWindowFunctions(query);
result.collectUsedColumns(query, true);
result.collectUsedColumns(query, true, settings.query_plan_optimize_primary_key);
result.required_source_columns_before_expanding_alias_columns = result.required_source_columns.getNames();
/// rewrite filters for select query, must go after getArrayJoinedColumns
@ -1331,7 +1332,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
result.aggregates = getAggregates(query, *select_query);
result.window_function_asts = getWindowFunctions(query, *select_query);
result.expressions_with_window_function = getExpressionsWithWindowFunctions(query);
result.collectUsedColumns(query, true);
result.collectUsedColumns(query, true, settings.query_plan_optimize_primary_key);
}
}
@ -1397,7 +1398,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
else
assertNoAggregates(query, "in wrong place");
result.collectUsedColumns(query, false);
result.collectUsedColumns(query, false, settings.query_plan_optimize_primary_key);
return std::make_shared<const TreeRewriterResult>(result);
}

View File

@ -88,7 +88,7 @@ struct TreeRewriterResult
bool add_special = true);
void collectSourceColumns(bool add_special);
void collectUsedColumns(const ASTPtr & query, bool is_select);
void collectUsedColumns(const ASTPtr & query, bool is_select, bool visit_index_hint);
Names requiredSourceColumns() const { return required_source_columns.getNames(); }
const Names & requiredSourceColumnsForAccessCheck() const { return required_source_columns_before_expanding_alias_columns; }
NameSet getArrayJoinSourceNameSet() const;

View File

@ -12,6 +12,8 @@ namespace QueryPlanOptimizations
/// This is the main function which optimizes the whole QueryPlan tree.
void optimizeTree(const QueryPlanOptimizationSettings & settings, QueryPlan::Node & root, QueryPlan::Nodes & nodes);
void optimizePrimaryKeyCondition(QueryPlan::Node & root);
/// Optimization is a function applied to QueryPlan::Node.
/// It can read and update subtree of specified node.
/// It return the number of updated layers of subtree if some change happened.

View File

@ -0,0 +1,49 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Storages/StorageMerge.h>
#include <Interpreters/ActionsDAG.h>
#include <stack>
namespace DB::QueryPlanOptimizations
{
void optimizePrimaryKeyCondition(QueryPlan::Node & root)
{
struct Frame
{
QueryPlan::Node * node = nullptr;
size_t next_child = 0;
};
std::stack<Frame> stack;
stack.push({.node = &root});
while (!stack.empty())
{
auto & frame = stack.top();
/// Traverse all children first.
if (frame.next_child < frame.node->children.size())
{
stack.push({.node = frame.node->children[frame.next_child]});
++frame.next_child;
continue;
}
if (auto * filter_step = typeid_cast<FilterStep *>(frame.node->step.get()))
{
auto * child = frame.node->children.at(0);
if (auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(child->step.get()))
read_from_merge_tree->addFilter(filter_step->getExpression(), filter_step->getFilterColumnName());
if (auto * read_from_merge = typeid_cast<ReadFromMerge *>(child->step.get()))
read_from_merge->addFilter(filter_step->getExpression(), filter_step->getFilterColumnName());
}
stack.pop();
}
}
}

View File

@ -434,6 +434,7 @@ void QueryPlan::explainPipeline(WriteBuffer & buffer, const ExplainPipelineOptio
void QueryPlan::optimize(const QueryPlanOptimizationSettings & optimization_settings)
{
QueryPlanOptimizations::optimizeTree(optimization_settings, *root, nodes);
QueryPlanOptimizations::optimizePrimaryKeyCondition(*root);
}
void QueryPlan::explainEstimate(MutableColumns & columns)

View File

@ -103,6 +103,8 @@ public:
std::vector<Node *> children = {};
};
const Node * getRootNode() const { return root; }
using Nodes = std::list<Node>;
private:

View File

@ -834,6 +834,9 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(Merge
{
return selectRangesToRead(
std::move(parts),
prewhere_info,
added_filter,
added_filter_column_name,
storage_snapshot->metadata,
storage_snapshot->getMetadataForQuery(),
query_info,
@ -848,6 +851,9 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(Merge
MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
MergeTreeData::DataPartsVector parts,
const PrewhereInfoPtr & prewhere_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -882,9 +888,31 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
// Build and check if primary key is used when necessary
const auto & primary_key = metadata_snapshot->getPrimaryKey();
Names primary_key_columns = primary_key.column_names;
KeyCondition key_condition(query_info, context, primary_key_columns, primary_key.expression);
std::optional<KeyCondition> key_condition;
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
if (settings.query_plan_optimize_primary_key)
{
ActionDAGNodes nodes;
if (prewhere_info)
{
const auto & node = prewhere_info->prewhere_actions->findInIndex(prewhere_info->prewhere_column_name);
nodes.nodes.push_back(&node);
}
if (added_filter)
{
const auto & node = added_filter->findInIndex(added_filter_column_name);
nodes.nodes.push_back(&node);
}
key_condition.emplace(std::move(nodes), query_info.syntax_analyzer_result, query_info.sets, context, primary_key_columns, primary_key.expression);
}
else
{
key_condition.emplace(query_info.query, query_info.syntax_analyzer_result, query_info.sets, context, primary_key_columns, primary_key.expression);
}
if (settings.force_primary_key && key_condition->alwaysUnknownOrTrue())
{
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{
.result = std::make_exception_ptr(Exception(
@ -892,7 +920,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
"Primary key ({}) is not used and setting 'force_primary_key' is set",
fmt::join(primary_key_columns, ", ")))});
}
LOG_DEBUG(log, "Key condition: {}", key_condition.toString());
LOG_DEBUG(log, "Key condition: {}", key_condition->toString());
const auto & select = query_info.query->as<ASTSelectQuery &>();
@ -915,7 +943,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
select,
metadata_snapshot->getColumns().getAllPhysical(),
parts,
key_condition,
*key_condition,
data,
metadata_snapshot,
context,
@ -940,7 +968,7 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
metadata_snapshot,
query_info,
context,
key_condition,
*key_condition,
reader_settings,
log,
num_streams,

View File

@ -114,6 +114,12 @@ public:
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeIndexes(JSONBuilder::JSONMap & map) const override;
void addFilter(ActionsDAGPtr expression, std::string column_name)
{
added_filter = std::move(expression);
added_filter_column_name = std::move(column_name);
}
StorageID getStorageID() const { return data.getStorageID(); }
UInt64 getSelectedParts() const { return selected_parts; }
UInt64 getSelectedRows() const { return selected_rows; }
@ -121,6 +127,9 @@ public:
static MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(
MergeTreeData::DataPartsVector parts,
const PrewhereInfoPtr & prewhere_info,
const ActionsDAGPtr & added_filter,
const std::string & added_filter_column_name,
const StorageMetadataPtr & metadata_snapshot_base,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
@ -151,6 +160,9 @@ private:
PrewhereInfoPtr prewhere_info;
ExpressionActionsSettings actions_settings;
ActionsDAGPtr added_filter;
std::string added_filter_column_name;
StorageSnapshotPtr storage_snapshot;
StorageMetadataPtr metadata_for_reading;

View File

@ -410,6 +410,14 @@ Chain buildPushingToViewsChain(
if (result_chain.empty())
result_chain.addSink(std::make_shared<NullSinkToStorage>(storage_header));
if (result_chain.getOutputHeader().columns() != 0)
{
/// Convert result header to empty block.
auto dag = ActionsDAG::makeConvertingActions(result_chain.getOutputHeader().getColumnsWithTypeAndName(), {}, ActionsDAG::MatchColumnsMode::Name);
auto actions = std::make_shared<ExpressionActions>(std::move(dag));
result_chain.addSink(std::make_shared<ConvertingTransform>(result_chain.getOutputHeader(), std::move(actions)));
}
return result_chain;
}

View File

@ -23,6 +23,7 @@
#include <Common/typeid_cast.h>
#include <Common/CurrentThread.h>
#include "Core/SortDescription.h"
#include <QueryPipeline/narrowPipe.h>
#include <Processors/DelayedPortsProcessor.h>
#include <Processors/RowsBeforeLimitCounter.h>
#include <Processors/Sources/RemoteSource.h>
@ -195,6 +196,12 @@ void QueryPipelineBuilder::resize(size_t num_streams, bool force, bool strict)
pipe.resize(num_streams, force, strict);
}
void QueryPipelineBuilder::narrow(size_t size)
{
checkInitializedAndNotCompleted();
narrowPipe(pipe, size);
}
void QueryPipelineBuilder::addTotalsHavingTransform(ProcessorPtr transform)
{
checkInitializedAndNotCompleted();

View File

@ -94,6 +94,11 @@ public:
/// Changes the number of output ports if needed. Adds ResizeTransform.
void resize(size_t num_streams, bool force = false, bool strict = false);
/// Concat some ports to have no more then size outputs.
/// This method is needed for Merge table engine in case of reading from many tables.
/// It prevents opening too many files at the same time.
void narrow(size_t size);
/// Unite several pipelines together. Result pipeline would have common_header structure.
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
static QueryPipelineBuilder unitePipelines(

View File

@ -190,7 +190,10 @@ void TCPHandler::runImpl()
/// If we need to shut down, or client disconnects.
if (!tcp_server.isOpen() || server.isCancelled() || in->eof())
{
LOG_TEST(log, "Closing connection (open: {}, cancelled: {}, eof: {})", tcp_server.isOpen(), server.isCancelled(), in->eof());
break;
}
Stopwatch watch;
state.reset();
@ -406,6 +409,8 @@ void TCPHandler::runImpl()
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT)
throw;
LOG_TEST(log, "Going to close connection due to exception: {}", e.message());
/// If there is UNEXPECTED_PACKET_FROM_CLIENT emulate network_error
/// to break the loop, but do not throw to send the exception to
/// the client.
@ -435,7 +440,7 @@ void TCPHandler::runImpl()
// Server should die on std logic errors in debug, like with assert()
// or ErrorCodes::LOGICAL_ERROR. This helps catch these errors in
// tests.
#ifndef NDEBUG
#ifdef ABORT_ON_LOGICAL_ERROR
catch (const std::logic_error & e)
{
state.io.onException();
@ -554,14 +559,14 @@ bool TCPHandler::readDataNext()
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
/// Poll interval should not be greater than receive_timeout
constexpr UInt64 min_timeout_ms = 5000; // 5 ms
UInt64 timeout_ms = std::max(min_timeout_ms, std::min(poll_interval * 1000000, static_cast<UInt64>(receive_timeout.totalMicroseconds())));
constexpr UInt64 min_timeout_us = 5000; // 5 ms
UInt64 timeout_us = std::max(min_timeout_us, std::min(poll_interval * 1000000, static_cast<UInt64>(receive_timeout.totalMicroseconds())));
bool read_ok = false;
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
while (true)
{
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_ms))
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(timeout_us))
{
/// If client disconnected.
if (in->eof())
@ -759,7 +764,22 @@ void TCPHandler::processTablesStatusRequest()
TablesStatusRequest request;
request.read(*in, client_tcp_protocol_version);
ContextPtr context_to_resolve_table_names = (session && session->sessionContext()) ? session->sessionContext() : server.context();
ContextPtr context_to_resolve_table_names;
if (is_interserver_mode)
{
/// In interserver mode session context does not exists, because authentication is done for each query.
/// We also cannot create query context earlier, because it cannot be created before authentication,
/// but query is not received yet. So we have to do this trick.
ContextMutablePtr fake_interserver_context = Context::createCopy(server.context());
if (!default_database.empty())
fake_interserver_context->setCurrentDatabase(default_database);
context_to_resolve_table_names = fake_interserver_context;
}
else
{
assert(session);
context_to_resolve_table_names = session->sessionContext();
}
TablesStatusResponse response;
for (const QualifiedTableName & table_name: request.tables)
@ -1354,7 +1374,7 @@ void TCPHandler::receiveQuery()
query_context = session->makeQueryContext(std::move(client_info));
/// Sets the default database if it wasn't set earlier for the session context.
if (!default_database.empty() && !session->sessionContext())
if (is_interserver_mode && !default_database.empty())
query_context->setCurrentDatabase(default_database);
if (state.part_uuids_to_ignore)

View File

@ -623,7 +623,7 @@ HiveFiles StorageHive::collectHiveFilesFromPartition(
for (size_t i = 0; i < partition_names.size(); ++i)
ranges.emplace_back(fields[i]);
const KeyCondition partition_key_condition(query_info, getContext(), partition_names, partition_minmax_idx_expr);
const KeyCondition partition_key_condition(query_info.query, query_info.syntax_analyzer_result, query_info.sets, getContext(), partition_names, partition_minmax_idx_expr);
if (!partition_key_condition.checkInHyperrectangle(ranges, partition_types).can_be_true)
return {};
}
@ -691,7 +691,7 @@ HiveFilePtr StorageHive::getHiveFileIfNeeded(
if (prune_level >= PruneLevel::File)
{
const KeyCondition hivefile_key_condition(query_info, getContext(), hivefile_name_types.getNames(), hivefile_minmax_idx_expr);
const KeyCondition hivefile_key_condition(query_info.query, query_info.syntax_analyzer_result, query_info.sets, getContext(), hivefile_name_types.getNames(), hivefile_minmax_idx_expr);
if (hive_file->useFileMinMaxIndex())
{
/// Load file level minmax index and apply

View File

@ -646,7 +646,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
std::make_shared<MergeTreeDataPartInMemory>(data, projection_name, new_part_info, projection_part_storage, new_data_part.get());
new_projection_part->is_temp = false;
new_projection_part->setColumns(block.getNamesAndTypesList());
new_projection_part->setColumns(block.getNamesAndTypesList(), {});
MergeTreePartition partition{};
new_projection_part->partition = std::move(partition);
new_projection_part->minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
@ -676,7 +676,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
new_data_part->uuid = part_uuid;
new_data_part->is_temp = true;
new_data_part->setColumns(block.getNamesAndTypesList());
new_data_part->setColumns(block.getNamesAndTypesList(), {});
new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
new_data_part->partition.create(metadata_snapshot, block, 0, context);

View File

@ -15,6 +15,8 @@
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/PartMetadataManagerOrdinary.h>
#include <Storages/MergeTree/PartMetadataManagerWithCache.h>
#include <Core/NamesAndTypes.h>
#include <Storages/ColumnsDescription.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/CurrentMetrics.h>
@ -62,6 +64,7 @@ namespace ErrorCodes
extern const int BAD_SIZE_OF_FILE_IN_DATA_PART;
extern const int BAD_TTL_FILE;
extern const int NOT_IMPLEMENTED;
extern const int NO_SUCH_COLUMN_IN_TABLE;
}
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const PartMetadataManagerPtr & manager)
@ -434,29 +437,60 @@ std::pair<time_t, time_t> IMergeTreeDataPart::getMinMaxTime() const
}
void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns)
void IMergeTreeDataPart::setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos)
{
columns = new_columns;
serialization_infos = new_infos;
column_name_to_position.clear();
column_name_to_position.reserve(new_columns.size());
size_t pos = 0;
for (const auto & column : columns)
column_name_to_position.emplace(column.name, pos++);
for (const auto & column : columns)
{
auto it = serialization_infos.find(column.name);
auto serialization = it == serialization_infos.end()
? IDataType::getSerialization(column)
: IDataType::getSerialization(column, *it->second);
serializations.emplace(column.name, serialization);
IDataType::forEachSubcolumn([&](const auto &, const auto & subname, const auto & subdata)
{
auto full_name = Nested::concatenateName(column.name, subname);
serializations.emplace(full_name, subdata.serialization);
}, {serialization, nullptr, nullptr, nullptr});
}
columns_description = ColumnsDescription(columns);
}
void IMergeTreeDataPart::setSerializationInfos(const SerializationInfoByName & new_infos)
NameAndTypePair IMergeTreeDataPart::getColumn(const String & column_name) const
{
serialization_infos = new_infos;
return columns_description.getColumnOrSubcolumn(GetColumnsOptions::AllPhysical, column_name);
}
SerializationPtr IMergeTreeDataPart::getSerialization(const NameAndTypePair & column) const
std::optional<NameAndTypePair> IMergeTreeDataPart::tryGetColumn(const String & column_name) const
{
auto it = serialization_infos.find(column.getNameInStorage());
return it == serialization_infos.end()
? IDataType::getSerialization(column)
: IDataType::getSerialization(column, *it->second);
return columns_description.tryGetColumnOrSubcolumn(GetColumnsOptions::AllPhysical, column_name);
}
SerializationPtr IMergeTreeDataPart::getSerialization(const String & column_name) const
{
auto serialization = tryGetSerialization(column_name);
if (!serialization)
throw Exception(ErrorCodes::NO_SUCH_COLUMN_IN_TABLE,
"There is no column or subcolumn {} in part {}", column_name, name);
return serialization;
}
SerializationPtr IMergeTreeDataPart::tryGetSerialization(const String & column_name) const
{
auto it = serializations.find(column_name);
return it == serializations.end() ? nullptr : it->second;
}
void IMergeTreeDataPart::removeIfNeeded()
@ -563,36 +597,24 @@ size_t IMergeTreeDataPart::getFileSizeOrZero(const String & file_name) const
return checksum->second.file_size;
}
String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(
const StorageSnapshotPtr & storage_snapshot, bool with_subcolumns) const
String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const
{
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
if (with_subcolumns)
options.withSubcolumns();
auto storage_columns = storage_snapshot->getColumns(options);
MergeTreeData::AlterConversions alter_conversions;
if (!parent_part)
alter_conversions = storage.getAlterConversionsForPart(shared_from_this());
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns(with_subcolumns);
auto columns_list = columns_description.get(options);
std::optional<std::string> minimum_size_column;
UInt64 minimum_size = std::numeric_limits<UInt64>::max();
for (const auto & column : storage_columns)
for (const auto & column : columns_list)
{
auto column_name = column.name;
auto column_type = column.type;
if (alter_conversions.isColumnRenamed(column.name))
column_name = alter_conversions.getColumnOldName(column.name);
if (!hasColumnFiles(column))
continue;
const auto size = getColumnSize(column_name).data_compressed;
const auto size = getColumnSize(column.name).data_compressed;
if (size < minimum_size)
{
minimum_size = size;
minimum_size_column = column_name;
minimum_size_column = column.name;
}
}
@ -847,7 +869,7 @@ CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
if (column_size.data_compressed != 0 && !storage_columns.hasCompressionCodec(part_column.name))
{
String path_to_data_file;
getSerialization(part_column)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
getSerialization(part_column.name)->enumerateStreams([&](const ISerialization::SubstreamPath & substream_path)
{
if (path_to_data_file.empty())
{
@ -992,7 +1014,7 @@ void IMergeTreeDataPart::loadRowsCount()
/// Most trivial types
if (column.type->isValueRepresentedByNumber()
&& !column.type->haveSubtypes()
&& getSerialization(column)->getKind() == ISerialization::Kind::DEFAULT)
&& getSerialization(column.name)->getKind() == ISerialization::Kind::DEFAULT)
{
auto size = getColumnSize(column.name);
@ -1044,7 +1066,7 @@ void IMergeTreeDataPart::loadRowsCount()
for (const NameAndTypePair & column : columns)
{
ColumnPtr column_col = column.type->createColumn(*getSerialization(column));
ColumnPtr column_col = column.type->createColumn(*getSerialization(column.name));
if (!column_col->isFixedAndContiguous() || column_col->lowCardinality())
continue;
@ -1186,8 +1208,7 @@ void IMergeTreeDataPart::loadColumns(bool require)
infos.readJSON(*in);
}
setColumns(loaded_columns);
setSerializationInfos(infos);
setColumns(loaded_columns, infos);
}
/// Project part / part with project parts / compact part doesn't support LWD.

View File

@ -15,6 +15,7 @@
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/ColumnsDescription.h>
#include <Interpreters/TransactionVersionMetadata.h>
#include <DataTypes/Serializations/SerializationInfo.h>
#include <Storages/MergeTree/IPartMetadataManager.h>
@ -133,15 +134,18 @@ public:
String getTypeName() const { return getType().toString(); }
void setColumns(const NamesAndTypesList & new_columns);
void setColumns(const NamesAndTypesList & new_columns, const SerializationInfoByName & new_infos);
const NamesAndTypesList & getColumns() const { return columns; }
const ColumnsDescription & getColumnsDescription() const { return columns_description; }
void setSerializationInfos(const SerializationInfoByName & new_infos);
NameAndTypePair getColumn(const String & name) const;
std::optional<NameAndTypePair> tryGetColumn(const String & column_name) const;
const SerializationInfoByName & getSerializationInfos() const { return serialization_infos; }
SerializationPtr getSerialization(const NameAndTypePair & column) const;
SerializationPtr getSerialization(const String & column_name) const;
SerializationPtr tryGetSerialization(const String & column_name) const;
/// Throws an exception if part is not stored in on-disk format.
void assertOnDisk() const;
@ -168,8 +172,7 @@ public:
/// Returns the name of a column with minimum compressed size (as returned by getColumnSize()).
/// If no checksums are present returns the name of the first physically existing column.
String getColumnNameWithMinimumCompressedSize(
const StorageSnapshotPtr & storage_snapshot, bool with_subcolumns) const;
String getColumnNameWithMinimumCompressedSize(bool with_subcolumns) const;
bool contains(const IMergeTreeDataPart & other) const { return info.contains(other.info); }
@ -527,6 +530,13 @@ private:
/// Map from name of column to its serialization info.
SerializationInfoByName serialization_infos;
/// Serializations for every columns and subcolumns by their names.
SerializationByName serializations;
/// Columns description for more convenient access
/// to columns by name and getting subcolumns.
ColumnsDescription columns_description;
/// Reads part unique identifier (if exists) from uuid.txt
void loadUUID();

Some files were not shown because too many files have changed in this diff Show More