mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge branch 'master' into allowed-caches-dir-for-dynamic-disks
This commit is contained in:
commit
ceb74ad3cb
@ -58,33 +58,6 @@ RUN curl https://sh.rustup.rs -sSf | bash -s -- -y && \
|
||||
rustup target add aarch64-apple-darwin && \
|
||||
rustup target add powerpc64le-unknown-linux-gnu
|
||||
|
||||
# Create vendor cache for cargo.
|
||||
#
|
||||
# Note, that the config.toml for the root is used, you will not be able to
|
||||
# install any other crates, except those which had been vendored (since if
|
||||
# there is "replace-with" for some source, then cargo will not look to other
|
||||
# remotes except this).
|
||||
#
|
||||
# Notes for the command itself:
|
||||
# - --chown is required to preserve the rights
|
||||
# - unstable-options for -C
|
||||
# - chmod is required to fix the permissions, since builds are running from a different user
|
||||
# - copy of the Cargo.lock is required for proper dependencies versions
|
||||
# - cargo vendor --sync is requried to overcome [1] bug.
|
||||
#
|
||||
# [1]: https://github.com/rust-lang/wg-cargo-std-aware/issues/23
|
||||
COPY --chown=root:root /rust /rust/packages
|
||||
RUN cargo -Z unstable-options -C /rust/packages vendor > $CARGO_HOME/config.toml && \
|
||||
cp "$(rustc --print=sysroot)"/lib/rustlib/src/rust/Cargo.lock "$(rustc --print=sysroot)"/lib/rustlib/src/rust/library/test/ && \
|
||||
cargo -Z unstable-options -C /rust/packages vendor --sync "$(rustc --print=sysroot)"/lib/rustlib/src/rust/library/test/Cargo.toml && \
|
||||
rm "$(rustc --print=sysroot)"/lib/rustlib/src/rust/library/test/Cargo.lock && \
|
||||
sed -i "s#\"vendor\"#\"/rust/vendor\"#" $CARGO_HOME/config.toml && \
|
||||
cat $CARGO_HOME/config.toml && \
|
||||
mv /rust/packages/vendor /rust/vendor && \
|
||||
chmod -R o=r+X /rust/vendor && \
|
||||
ls -R -l /rust/packages && \
|
||||
rm -r /rust/packages
|
||||
|
||||
# NOTE: Seems like gcc-11 is too new for ubuntu20 repository
|
||||
# A cross-linker for RISC-V 64 (we need it, because LLVM's LLD does not work):
|
||||
RUN add-apt-repository ppa:ubuntu-toolchain-r/test --yes \
|
||||
|
@ -1 +0,0 @@
|
||||
../../../rust
|
@ -80,9 +80,11 @@ def run_docker_image_with_env(
|
||||
output_dir: Path,
|
||||
env_variables: List[str],
|
||||
ch_root: Path,
|
||||
cargo_cache_dir: Path,
|
||||
ccache_dir: Optional[Path],
|
||||
) -> None:
|
||||
output_dir.mkdir(parents=True, exist_ok=True)
|
||||
cargo_cache_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
env_part = " -e ".join(env_variables)
|
||||
if env_part:
|
||||
@ -105,7 +107,7 @@ def run_docker_image_with_env(
|
||||
cmd = (
|
||||
f"docker run --network=host --user={user} --rm {ccache_mount}"
|
||||
f"--volume={output_dir}:/output --volume={ch_root}:/build {env_part} "
|
||||
f"{interactive} {image_name}"
|
||||
f"--volume={cargo_cache_dir}:/rust/cargo/registry {interactive} {image_name}"
|
||||
)
|
||||
|
||||
logging.info("Will build ClickHouse pkg with cmd: '%s'", cmd)
|
||||
@ -417,6 +419,13 @@ def parse_args() -> argparse.Namespace:
|
||||
action="store_true",
|
||||
help="if set, the build fails on errors writing cache to S3",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cargo-cache-dir",
|
||||
default=Path(os.getenv("CARGO_HOME", "") or Path.home() / ".cargo")
|
||||
/ "registry",
|
||||
type=dir_name,
|
||||
help="a directory to preserve the rust cargo crates",
|
||||
)
|
||||
parser.add_argument("--force-build-image", action="store_true")
|
||||
parser.add_argument("--version")
|
||||
parser.add_argument("--official", action="store_true")
|
||||
@ -497,6 +506,7 @@ def main() -> None:
|
||||
args.output_dir,
|
||||
env_prepared,
|
||||
ch_root,
|
||||
args.cargo_cache_dir,
|
||||
args.ccache_dir,
|
||||
)
|
||||
logging.info("Output placed into %s", args.output_dir)
|
||||
|
@ -57,6 +57,7 @@ if (BUILD_STANDALONE_KEEPER)
|
||||
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/IO/ReadBuffer.cpp
|
||||
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/HTTPPathHints.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/KeeperTCPHandler.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/TCPServer.cpp
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Server/NotFoundHandler.cpp
|
||||
|
@ -150,7 +150,7 @@ int Keeper::run()
|
||||
}
|
||||
if (config().hasOption("version"))
|
||||
{
|
||||
std::cout << DBMS_NAME << " keeper version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
|
||||
std::cout << VERSION_NAME << " keeper version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -389,7 +389,7 @@ int Server::run()
|
||||
}
|
||||
if (config().hasOption("version"))
|
||||
{
|
||||
std::cout << DBMS_NAME << " server version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
|
||||
std::cout << VERSION_NAME << " server version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
|
||||
return 0;
|
||||
}
|
||||
return Application::run(); // NOLINT
|
||||
|
@ -317,7 +317,7 @@
|
||||
<concurrent_threads_soft_limit_ratio_to_cores>0</concurrent_threads_soft_limit_ratio_to_cores>
|
||||
|
||||
<!-- Maximum number of concurrent queries. -->
|
||||
<max_concurrent_queries>100</max_concurrent_queries>
|
||||
<max_concurrent_queries>1000</max_concurrent_queries>
|
||||
|
||||
<!-- Maximum memory usage (resident set size) for server process.
|
||||
Zero value or unset means default. Default is "max_server_memory_usage_to_ram_ratio" of available physical RAM.
|
||||
|
@ -5,6 +5,7 @@ CXXFLAGS = "@RUST_CXXFLAGS@"
|
||||
[build]
|
||||
rustflags = @RUSTFLAGS@
|
||||
rustdocflags = @RUSTFLAGS@
|
||||
@RUSTCWRAPPER@
|
||||
|
||||
[unstable]
|
||||
@RUST_CARGO_BUILD_STD@
|
||||
|
@ -1,4 +0,0 @@
|
||||
# Just in case ignore any cargo stuff (and just in case someone will run this
|
||||
# docker build locally with build context using folder root):
|
||||
target
|
||||
vendor
|
4
rust/.gitignore
vendored
4
rust/.gitignore
vendored
@ -1,4 +0,0 @@
|
||||
# This is for tar --exclude-vcs-ignores (and just in case someone will run
|
||||
# docker build locally with build context created via tar):
|
||||
target
|
||||
vendor
|
@ -14,6 +14,13 @@ macro(configure_rustc)
|
||||
set(RUST_CFLAGS "${RUST_CFLAGS} --sysroot ${CMAKE_SYSROOT}")
|
||||
endif()
|
||||
|
||||
if(CCACHE_EXECUTABLE MATCHES "/sccache$")
|
||||
message(STATUS "Using RUSTC_WRAPPER: ${CCACHE_EXECUTABLE}")
|
||||
set(RUSTCWRAPPER "rustc-wrapper = \"${CCACHE_EXECUTABLE}\"")
|
||||
else()
|
||||
set(RUSTCWRAPPER "")
|
||||
endif()
|
||||
|
||||
set(RUSTFLAGS "[]")
|
||||
set(RUST_CARGO_BUILD_STD "")
|
||||
# For more info: https://doc.rust-lang.org/beta/unstable-book/compiler-flags/sanitizer.html#memorysanitizer
|
||||
|
@ -19,6 +19,7 @@ message (STATUS "Will build ${VERSION_FULL} revision ${VERSION_REVISION} ${VERSI
|
||||
include (configure_config.cmake)
|
||||
configure_file (Common/config.h.in ${CONFIG_INCLUDE_PATH}/config.h)
|
||||
configure_file (Common/config_version.h.in ${CONFIG_INCLUDE_PATH}/config_version.h)
|
||||
configure_file (Common/config_version.cpp.in ${CONFIG_INCLUDE_PATH}/config_version.cpp)
|
||||
|
||||
if (USE_DEBUG_HELPERS)
|
||||
get_target_property(MAGIC_ENUM_INCLUDE_DIR ch_contrib::magic_enum INTERFACE_INCLUDE_DIRECTORIES)
|
||||
@ -150,7 +151,7 @@ else()
|
||||
message(STATUS "StorageFileLog is only supported on Linux")
|
||||
endif ()
|
||||
|
||||
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
|
||||
list (APPEND clickhouse_common_io_sources ${CONFIG_INCLUDE_PATH}/config_version.cpp)
|
||||
|
||||
list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp Functions/FunctionHelpers.cpp Functions/extractTimeZoneFromFunctionArguments.cpp Functions/FunctionsLogical.cpp Functions/indexHint.cpp)
|
||||
list (APPEND dbms_headers Functions/IFunction.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/FunctionsLogical.h Functions/indexHint.h)
|
||||
|
@ -2505,7 +2505,7 @@ void ClientBase::clearTerminal()
|
||||
|
||||
void ClientBase::showClientVersion()
|
||||
{
|
||||
std::cout << DBMS_NAME << " " + getName() + " version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
|
||||
std::cout << VERSION_NAME << " " + getName() + " version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
|
||||
}
|
||||
|
||||
namespace
|
||||
|
@ -280,9 +280,9 @@ void Connection::sendHello()
|
||||
"Parameters 'default_database', 'user' and 'password' must not contain ASCII control characters");
|
||||
|
||||
writeVarUInt(Protocol::Client::Hello, *out);
|
||||
writeStringBinary((DBMS_NAME " ") + client_name, *out);
|
||||
writeVarUInt(DBMS_VERSION_MAJOR, *out);
|
||||
writeVarUInt(DBMS_VERSION_MINOR, *out);
|
||||
writeStringBinary((VERSION_NAME " ") + client_name, *out);
|
||||
writeVarUInt(VERSION_MAJOR, *out);
|
||||
writeVarUInt(VERSION_MINOR, *out);
|
||||
// NOTE For backward compatibility of the protocol, client cannot send its version_patch.
|
||||
writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, *out);
|
||||
writeStringBinary(default_database, *out);
|
||||
|
@ -106,6 +106,11 @@ public:
|
||||
return prompter.getHints(name, getAllRegisteredNames());
|
||||
}
|
||||
|
||||
std::vector<String> getHints(const String & name, const std::vector<String> & prompting_strings) const
|
||||
{
|
||||
return prompter.getHints(name, prompting_strings);
|
||||
}
|
||||
|
||||
void appendHintsMessage(String & error_message, const String & name) const
|
||||
{
|
||||
auto hints = getHints(name);
|
||||
|
3
src/Common/config_version.cpp.in
Normal file
3
src/Common/config_version.cpp.in
Normal file
@ -0,0 +1,3 @@
|
||||
/// This file was autogenerated by CMake
|
||||
|
||||
const char * VERSION_GITHASH = "@VERSION_GITHASH@";
|
@ -6,7 +6,6 @@
|
||||
// only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
|
||||
#cmakedefine VERSION_REVISION @VERSION_REVISION@
|
||||
#cmakedefine VERSION_NAME "@VERSION_NAME@"
|
||||
#define DBMS_NAME VERSION_NAME
|
||||
#cmakedefine VERSION_MAJOR @VERSION_MAJOR@
|
||||
#cmakedefine VERSION_MINOR @VERSION_MINOR@
|
||||
#cmakedefine VERSION_PATCH @VERSION_PATCH@
|
||||
@ -15,27 +14,10 @@
|
||||
#cmakedefine VERSION_OFFICIAL "@VERSION_OFFICIAL@"
|
||||
#cmakedefine VERSION_FULL "@VERSION_FULL@"
|
||||
#cmakedefine VERSION_DESCRIBE "@VERSION_DESCRIBE@"
|
||||
#cmakedefine VERSION_GITHASH "@VERSION_GITHASH@"
|
||||
#cmakedefine VERSION_INTEGER @VERSION_INTEGER@
|
||||
#cmakedefine VERSION_DATE @VERSION_DATE@
|
||||
|
||||
#if defined(VERSION_MAJOR)
|
||||
#define DBMS_VERSION_MAJOR VERSION_MAJOR
|
||||
#else
|
||||
#define DBMS_VERSION_MAJOR 0
|
||||
#endif
|
||||
|
||||
#if defined(VERSION_MINOR)
|
||||
#define DBMS_VERSION_MINOR VERSION_MINOR
|
||||
#else
|
||||
#define DBMS_VERSION_MINOR 0
|
||||
#endif
|
||||
|
||||
#if defined(VERSION_PATCH)
|
||||
#define DBMS_VERSION_PATCH VERSION_PATCH
|
||||
#else
|
||||
#define DBMS_VERSION_PATCH 0
|
||||
#endif
|
||||
/// These fields are frequently changing and we don't want to have them in the header file to allow caching.
|
||||
extern const char * VERSION_GITHASH;
|
||||
|
||||
#if !defined(VERSION_OFFICIAL)
|
||||
# define VERSION_OFFICIAL ""
|
||||
|
@ -466,7 +466,7 @@ String EnviCommand::run()
|
||||
|
||||
StringBuffer buf;
|
||||
buf << "Environment:\n";
|
||||
buf << "clickhouse.keeper.version=" << (String(VERSION_DESCRIBE) + "-" + VERSION_GITHASH) << '\n';
|
||||
buf << "clickhouse.keeper.version=" << VERSION_DESCRIBE << '-' << VERSION_GITHASH << '\n';
|
||||
|
||||
buf << "host.name=" << Environment::nodeName() << '\n';
|
||||
buf << "os.name=" << Environment::osDisplayName() << '\n';
|
||||
|
@ -9,8 +9,10 @@
|
||||
|
||||
#include "config_version.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct IFourLetterCommand;
|
||||
using FourLetterCommandPtr = std::shared_ptr<DB::IFourLetterCommand>;
|
||||
|
||||
@ -43,7 +45,7 @@ public:
|
||||
using Commands = std::unordered_map<int32_t, FourLetterCommandPtr>;
|
||||
using AllowList = std::vector<int32_t>;
|
||||
|
||||
///represent '*' which is used in allow list
|
||||
/// Represents '*' which is used in allow list.
|
||||
static constexpr int32_t ALLOW_LIST_ALL = 0;
|
||||
|
||||
bool isKnown(int32_t code);
|
||||
|
@ -1,7 +1,10 @@
|
||||
#include <memory>
|
||||
#include <Databases/IDatabase.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Parsers/ASTCreateQuery.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Common/NamePrompter.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -18,7 +21,13 @@ StoragePtr IDatabase::getTable(const String & name, ContextPtr context) const
|
||||
{
|
||||
if (auto storage = tryGetTable(name, context))
|
||||
return storage;
|
||||
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} doesn't exist", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name));
|
||||
TableNameHints hints(this->shared_from_this(), context);
|
||||
std::vector<String> names = hints.getHints(name);
|
||||
if (!names.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} does not exist. Maybe you meant {}?", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name), backQuoteIfNeed(names[0]));
|
||||
}
|
||||
else throw Exception(ErrorCodes::UNKNOWN_TABLE, "Table {}.{} does not exist", backQuoteIfNeed(getDatabaseName()), backQuoteIfNeed(name));
|
||||
}
|
||||
|
||||
std::vector<std::pair<ASTPtr, StoragePtr>> IDatabase::getTablesForBackup(const FilterByNameFunction &, const ContextPtr &) const
|
||||
|
@ -372,6 +372,7 @@ protected:
|
||||
};
|
||||
|
||||
using DatabasePtr = std::shared_ptr<IDatabase>;
|
||||
using ConstDatabasePtr = std::shared_ptr<const IDatabase>;
|
||||
using Databases = std::map<String, DatabasePtr>;
|
||||
|
||||
}
|
||||
|
@ -1,7 +1,9 @@
|
||||
#include <algorithm>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <Columns/ColumnVector.h>
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <Functions/FunctionHelpers.h>
|
||||
#include <Functions/FunctionFactory.h>
|
||||
#include "arrayScalarProduct.h"
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -10,6 +12,8 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
|
||||
extern const int ILLEGAL_COLUMN;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
|
||||
@ -70,44 +74,32 @@ namespace ErrorCodes
|
||||
* The "curve" will be present by a line that moves one step either towards right or top on each threshold change.
|
||||
*/
|
||||
|
||||
|
||||
struct NameArrayAUC
|
||||
{
|
||||
static constexpr auto name = "arrayAUC";
|
||||
};
|
||||
|
||||
|
||||
class ArrayAUCImpl
|
||||
class FunctionArrayAUC : public IFunction
|
||||
{
|
||||
public:
|
||||
using ResultType = Float64;
|
||||
static constexpr auto name = "arrayAUC";
|
||||
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayAUC>(); }
|
||||
|
||||
static DataTypePtr getReturnType(const DataTypePtr & /* score_type */, const DataTypePtr & label_type)
|
||||
{
|
||||
if (!(isNumber(label_type) || isEnum(label_type)))
|
||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "{} label must have numeric type.", std::string(NameArrayAUC::name));
|
||||
|
||||
return std::make_shared<DataTypeNumber<ResultType>>();
|
||||
}
|
||||
|
||||
template <typename ResultType, typename T, typename U>
|
||||
static ResultType apply(
|
||||
const T * scores,
|
||||
const U * labels,
|
||||
size_t size)
|
||||
private:
|
||||
static Float64 apply(
|
||||
const IColumn & scores,
|
||||
const IColumn & labels,
|
||||
ColumnArray::Offset current_offset,
|
||||
ColumnArray::Offset next_offset)
|
||||
{
|
||||
struct ScoreLabel
|
||||
{
|
||||
T score;
|
||||
Float64 score;
|
||||
bool label;
|
||||
};
|
||||
|
||||
size_t size = next_offset - current_offset;
|
||||
PODArrayWithStackMemory<ScoreLabel, 1024> sorted_labels(size);
|
||||
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
bool label = labels[i] > 0;
|
||||
sorted_labels[i].score = scores[i];
|
||||
bool label = labels.getFloat64(current_offset + i) > 0;
|
||||
sorted_labels[i].score = scores.getFloat64(current_offset + i);
|
||||
sorted_labels[i].label = label;
|
||||
}
|
||||
|
||||
@ -129,18 +121,85 @@ public:
|
||||
/// Then divide the area to the area of rectangle.
|
||||
|
||||
if (count_positive == 0 || count_positive == size)
|
||||
return std::numeric_limits<ResultType>::quiet_NaN();
|
||||
return std::numeric_limits<Float64>::quiet_NaN();
|
||||
|
||||
return static_cast<ResultType>(area) / count_positive / (size - count_positive);
|
||||
return static_cast<Float64>(area) / count_positive / (size - count_positive);
|
||||
}
|
||||
|
||||
static void vector(
|
||||
const IColumn & scores,
|
||||
const IColumn & labels,
|
||||
const ColumnArray::Offsets & offsets,
|
||||
PaddedPODArray<Float64> & result)
|
||||
{
|
||||
size_t size = offsets.size();
|
||||
result.resize(size);
|
||||
|
||||
ColumnArray::Offset current_offset = 0;
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
{
|
||||
auto next_offset = offsets[i];
|
||||
result[i] = apply(scores, labels, current_offset, next_offset);
|
||||
current_offset = next_offset;
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
String getName() const override { return name; }
|
||||
size_t getNumberOfArguments() const override { return 2; }
|
||||
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo &) const override { return false; }
|
||||
|
||||
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
|
||||
{
|
||||
for (size_t i = 0; i < getNumberOfArguments(); ++i)
|
||||
{
|
||||
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(arguments[i].get());
|
||||
if (!array_type)
|
||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "All arguments for function {} must be an array.", getName());
|
||||
|
||||
const auto & nested_type = array_type->getNestedType();
|
||||
if (!isNativeNumber(nested_type) && !isEnum(nested_type))
|
||||
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "{} cannot process values of type {}",
|
||||
getName(), nested_type->getName());
|
||||
}
|
||||
|
||||
return std::make_shared<DataTypeFloat64>();
|
||||
}
|
||||
|
||||
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t) const override
|
||||
{
|
||||
ColumnPtr col1 = arguments[0].column->convertToFullColumnIfConst();
|
||||
ColumnPtr col2 = arguments[1].column->convertToFullColumnIfConst();
|
||||
|
||||
const ColumnArray * col_array1 = checkAndGetColumn<ColumnArray>(col1.get());
|
||||
if (!col_array1)
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"Illegal column {} of first argument of function {}", arguments[0].column->getName(), getName());
|
||||
|
||||
const ColumnArray * col_array2 = checkAndGetColumn<ColumnArray>(col2.get());
|
||||
if (!col_array2)
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"Illegal column {} of second argument of function {}", arguments[1].column->getName(), getName());
|
||||
|
||||
if (!col_array1->hasEqualOffsets(*col_array2))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Array arguments for function {} must have equal sizes", getName());
|
||||
|
||||
auto col_res = ColumnVector<Float64>::create();
|
||||
|
||||
vector(
|
||||
col_array1->getData(),
|
||||
col_array2->getData(),
|
||||
col_array1->getOffsets(),
|
||||
col_res->getData());
|
||||
|
||||
return col_res;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/// auc(array_score, array_label) - Calculate AUC with array of score and label
|
||||
using FunctionArrayAUC = FunctionArrayScalarProduct<ArrayAUCImpl, NameArrayAUC>;
|
||||
|
||||
REGISTER_FUNCTION(ArrayAUC)
|
||||
{
|
||||
factory.registerFunction<FunctionArrayAUC>();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -194,9 +194,9 @@ void ClientInfo::setInitialQuery()
|
||||
query_kind = QueryKind::INITIAL_QUERY;
|
||||
fillOSUserHostNameAndVersionInfo();
|
||||
if (client_name.empty())
|
||||
client_name = DBMS_NAME;
|
||||
client_name = VERSION_NAME;
|
||||
else
|
||||
client_name = (DBMS_NAME " ") + client_name;
|
||||
client_name = (VERSION_NAME " ") + client_name;
|
||||
}
|
||||
|
||||
|
||||
@ -210,9 +210,9 @@ void ClientInfo::fillOSUserHostNameAndVersionInfo()
|
||||
|
||||
client_hostname = getFQDNOrHostName();
|
||||
|
||||
client_version_major = DBMS_VERSION_MAJOR;
|
||||
client_version_minor = DBMS_VERSION_MINOR;
|
||||
client_version_patch = DBMS_VERSION_PATCH;
|
||||
client_version_major = VERSION_MAJOR;
|
||||
client_version_minor = VERSION_MINOR;
|
||||
client_version_patch = VERSION_PATCH;
|
||||
client_tcp_protocol_version = DBMS_TCP_PROTOCOL_VERSION;
|
||||
}
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include <string>
|
||||
#include <Interpreters/DatabaseCatalog.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/loadMetadata.h>
|
||||
@ -14,6 +15,7 @@
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Poco/DirectoryIterator.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/atomicRename.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
@ -23,6 +25,7 @@
|
||||
#include <Common/noexcept_scope.h>
|
||||
#include <Common/checkStackSize.h>
|
||||
|
||||
#include "Interpreters/Context_fwd.h"
|
||||
#include "config.h"
|
||||
|
||||
#if USE_MYSQL
|
||||
@ -35,7 +38,6 @@
|
||||
# include <Storages/PostgreSQL/StorageMaterializedPostgreSQL.h>
|
||||
#endif
|
||||
|
||||
|
||||
namespace CurrentMetrics
|
||||
{
|
||||
extern const Metric TablesToDropQueueSize;
|
||||
@ -59,6 +61,29 @@ namespace ErrorCodes
|
||||
extern const int UNFINISHED;
|
||||
}
|
||||
|
||||
class DatabaseNameHints : public IHints<1, DatabaseNameHints>
|
||||
{
|
||||
public:
|
||||
explicit DatabaseNameHints(const DatabaseCatalog & database_catalog_)
|
||||
: database_catalog(database_catalog_)
|
||||
{
|
||||
}
|
||||
Names getAllRegisteredNames() const override
|
||||
{
|
||||
Names result;
|
||||
auto databases_list = database_catalog.getDatabases();
|
||||
for (const auto & database_name : databases_list | boost::adaptors::map_keys)
|
||||
{
|
||||
if (database_name == DatabaseCatalog::TEMPORARY_DATABASE)
|
||||
continue;
|
||||
result.emplace_back(database_name);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
private:
|
||||
const DatabaseCatalog & database_catalog;
|
||||
};
|
||||
|
||||
TemporaryTableHolder::TemporaryTableHolder(ContextPtr context_, const TemporaryTableHolder::Creator & creator, const ASTPtr & query)
|
||||
: WithContext(context_->getGlobalContext())
|
||||
, temporary_tables(DatabaseCatalog::instance().getDatabaseForTemporaryTables().get())
|
||||
@ -313,7 +338,18 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
|
||||
{
|
||||
assert(!db_and_table.first && !db_and_table.second);
|
||||
if (exception)
|
||||
exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} doesn't exist", table_id.getNameForLogs()));
|
||||
{
|
||||
TableNameHints hints(this->tryGetDatabase(table_id.getDatabaseName()), getContext());
|
||||
std::vector<String> names = hints.getHints(table_id.getTableName());
|
||||
if (!names.empty())
|
||||
{
|
||||
/// There is two options: first is to print just the name of the table
|
||||
/// and the second is to print the result in format: db_name.table_name. I'll comment out the second option below
|
||||
/// I also leave possibility to print several suggestions
|
||||
exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist. Maybe you meant {}?", table_id.getNameForLogs(), backQuoteIfNeed(names[0])));
|
||||
}
|
||||
else exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", table_id.getNameForLogs()));
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
@ -359,13 +395,26 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
|
||||
|
||||
std::lock_guard lock{databases_mutex};
|
||||
auto it = databases.find(table_id.getDatabaseName());
|
||||
if (databases.end() == it)
|
||||
if (databases.end() != it)
|
||||
database = it->second;
|
||||
}
|
||||
|
||||
if (!database)
|
||||
{
|
||||
if (exception)
|
||||
{
|
||||
if (exception)
|
||||
exception->emplace(Exception(ErrorCodes::UNKNOWN_DATABASE, "Database {} doesn't exist", backQuoteIfNeed(table_id.getDatabaseName())));
|
||||
return {};
|
||||
DatabaseNameHints hints(*this);
|
||||
std::vector<String> names = hints.getHints(table_id.getDatabaseName());
|
||||
if (names.empty())
|
||||
{
|
||||
exception->emplace(Exception(ErrorCodes::UNKNOWN_DATABASE, "Database {} does not exist", backQuoteIfNeed(table_id.getDatabaseName())));
|
||||
}
|
||||
else
|
||||
{
|
||||
exception->emplace(Exception(ErrorCodes::UNKNOWN_DATABASE, "Database {} does not exist. Maybe you meant {}?", backQuoteIfNeed(table_id.getDatabaseName()), backQuoteIfNeed(names[0])));
|
||||
}
|
||||
}
|
||||
database = it->second;
|
||||
return {};
|
||||
}
|
||||
|
||||
StoragePtr table;
|
||||
@ -386,8 +435,18 @@ DatabaseAndTable DatabaseCatalog::getTableImpl(
|
||||
}
|
||||
|
||||
if (!table && exception && !exception->has_value())
|
||||
exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} doesn't exist", table_id.getNameForLogs()));
|
||||
|
||||
{
|
||||
TableNameHints hints(this->tryGetDatabase(table_id.getDatabaseName()), getContext());
|
||||
std::vector<String> names = hints.getHints(table_id.getTableName());
|
||||
if (names.empty())
|
||||
{
|
||||
exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist", table_id.getNameForLogs()));
|
||||
}
|
||||
else
|
||||
{
|
||||
exception->emplace(Exception(ErrorCodes::UNKNOWN_TABLE, "Table {} does not exist. Maybe you meant {}?", table_id.getNameForLogs(), backQuoteIfNeed(names[0])));
|
||||
}
|
||||
}
|
||||
if (!table)
|
||||
database = nullptr;
|
||||
|
||||
@ -438,8 +497,26 @@ bool DatabaseCatalog::isPredefinedTable(const StorageID & table_id) const
|
||||
|
||||
void DatabaseCatalog::assertDatabaseExists(const String & database_name) const
|
||||
{
|
||||
std::lock_guard lock{databases_mutex};
|
||||
assertDatabaseExistsUnlocked(database_name);
|
||||
DatabasePtr db;
|
||||
{
|
||||
std::lock_guard lock{databases_mutex};
|
||||
assert(!database_name.empty());
|
||||
if (auto it = databases.find(database_name); it != databases.end())
|
||||
db = it->second;
|
||||
}
|
||||
if (!db)
|
||||
{
|
||||
DatabaseNameHints hints(*this);
|
||||
std::vector<String> names = hints.getHints(database_name);
|
||||
if (names.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database {} does not exist", backQuoteIfNeed(database_name));
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database {} does not exist. Maybe you meant {}?", backQuoteIfNeed(database_name), backQuoteIfNeed(names[0]));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DatabaseCatalog::assertDatabaseDoesntExist(const String & database_name) const
|
||||
@ -448,19 +525,11 @@ void DatabaseCatalog::assertDatabaseDoesntExist(const String & database_name) co
|
||||
assertDatabaseDoesntExistUnlocked(database_name);
|
||||
}
|
||||
|
||||
void DatabaseCatalog::assertDatabaseExistsUnlocked(const String & database_name) const
|
||||
{
|
||||
assert(!database_name.empty());
|
||||
if (databases.end() == databases.find(database_name))
|
||||
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database {} doesn't exist", backQuoteIfNeed(database_name));
|
||||
}
|
||||
|
||||
|
||||
void DatabaseCatalog::assertDatabaseDoesntExistUnlocked(const String & database_name) const
|
||||
{
|
||||
assert(!database_name.empty());
|
||||
if (databases.end() != databases.find(database_name))
|
||||
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Database {} already exists.", backQuoteIfNeed(database_name));
|
||||
throw Exception(ErrorCodes::DATABASE_ALREADY_EXISTS, "Database {} already exists", backQuoteIfNeed(database_name));
|
||||
}
|
||||
|
||||
void DatabaseCatalog::attachDatabase(const String & database_name, const DatabasePtr & database)
|
||||
@ -480,18 +549,34 @@ DatabasePtr DatabaseCatalog::detachDatabase(ContextPtr local_context, const Stri
|
||||
{
|
||||
if (database_name == TEMPORARY_DATABASE)
|
||||
throw Exception(ErrorCodes::DATABASE_ACCESS_DENIED, "Cannot detach database with temporary tables.");
|
||||
|
||||
assert(!database_name.empty());
|
||||
DatabasePtr db;
|
||||
{
|
||||
std::lock_guard lock{databases_mutex};
|
||||
assertDatabaseExistsUnlocked(database_name);
|
||||
db = databases.find(database_name)->second;
|
||||
UUID db_uuid = db->getUUID();
|
||||
if (db_uuid != UUIDHelpers::Nil)
|
||||
removeUUIDMapping(db_uuid);
|
||||
databases.erase(database_name);
|
||||
}
|
||||
if (auto it = databases.find(database_name); it != databases.end())
|
||||
{
|
||||
db = it->second;
|
||||
|
||||
UUID db_uuid = db->getUUID();
|
||||
if (db_uuid != UUIDHelpers::Nil)
|
||||
removeUUIDMapping(db_uuid);
|
||||
databases.erase(database_name);
|
||||
|
||||
}
|
||||
}
|
||||
if (!db)
|
||||
{
|
||||
DatabaseNameHints hints(*this);
|
||||
std::vector<String> names = hints.getHints(database_name);
|
||||
if (names.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database {} does not exist", backQuoteIfNeed(database_name));
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database {} does not exist. Maybe you meant {}?", backQuoteIfNeed(database_name), backQuoteIfNeed(names[0]));
|
||||
}
|
||||
}
|
||||
if (check_empty)
|
||||
{
|
||||
try
|
||||
@ -527,7 +612,6 @@ DatabasePtr DatabaseCatalog::detachDatabase(ContextPtr local_context, const Stri
|
||||
if (db_uuid != UUIDHelpers::Nil)
|
||||
removeUUIDMappingFinally(db_uuid);
|
||||
}
|
||||
|
||||
return db;
|
||||
}
|
||||
|
||||
@ -553,9 +637,28 @@ void DatabaseCatalog::updateDatabaseName(const String & old_name, const String &
|
||||
|
||||
DatabasePtr DatabaseCatalog::getDatabase(const String & database_name) const
|
||||
{
|
||||
std::lock_guard lock{databases_mutex};
|
||||
assertDatabaseExistsUnlocked(database_name);
|
||||
return databases.find(database_name)->second;
|
||||
assert(!database_name.empty());
|
||||
DatabasePtr db;
|
||||
{
|
||||
std::lock_guard lock{databases_mutex};
|
||||
if (auto it = databases.find(database_name); it != databases.end())
|
||||
db = it->second;
|
||||
}
|
||||
|
||||
if (!db)
|
||||
{
|
||||
DatabaseNameHints hints(*this);
|
||||
std::vector<String> names = hints.getHints(database_name);
|
||||
if (names.empty())
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database {} does not exist", backQuoteIfNeed(database_name));
|
||||
}
|
||||
else
|
||||
{
|
||||
throw Exception(ErrorCodes::UNKNOWN_DATABASE, "Database {} does not exist. Maybe you meant {}?", backQuoteIfNeed(database_name), backQuoteIfNeed(names[0]));
|
||||
}
|
||||
}
|
||||
return db;
|
||||
}
|
||||
|
||||
DatabasePtr DatabaseCatalog::tryGetDatabase(const String & database_name) const
|
||||
|
@ -6,7 +6,10 @@
|
||||
#include <Databases/TablesDependencyGraph.h>
|
||||
#include <Parsers/IAST_fwd.h>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
#include "Common/NamePrompter.h"
|
||||
#include <Common/SharedMutex.h>
|
||||
#include "Storages/IStorage.h"
|
||||
#include "Databases/IDatabase.h"
|
||||
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <Poco/Logger.h>
|
||||
@ -27,6 +30,32 @@ namespace fs = std::filesystem;
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class TableNameHints : public IHints<1, TableNameHints>
|
||||
{
|
||||
public:
|
||||
TableNameHints(ConstDatabasePtr database_, ContextPtr context_)
|
||||
: context(context_),
|
||||
database(database_)
|
||||
{
|
||||
}
|
||||
Names getAllRegisteredNames() const override
|
||||
{
|
||||
Names result;
|
||||
if (database)
|
||||
{
|
||||
for (auto table_it = database->getTablesIterator(context); table_it->isValid(); table_it->next())
|
||||
{
|
||||
const auto & storage_id = table_it->table()->getStorageID();
|
||||
result.emplace_back(storage_id.getTableName());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
private:
|
||||
ContextPtr context;
|
||||
ConstDatabasePtr database;
|
||||
};
|
||||
|
||||
class IDatabase;
|
||||
class Exception;
|
||||
class ColumnsDescription;
|
||||
@ -262,7 +291,6 @@ private:
|
||||
static std::unique_ptr<DatabaseCatalog> database_catalog;
|
||||
|
||||
explicit DatabaseCatalog(ContextMutablePtr global_context_);
|
||||
void assertDatabaseExistsUnlocked(const String & database_name) const TSA_REQUIRES(databases_mutex);
|
||||
void assertDatabaseDoesntExistUnlocked(const String & database_name) const TSA_REQUIRES(databases_mutex);
|
||||
|
||||
void shutdownImpl();
|
||||
|
@ -113,13 +113,14 @@ QueryTreeNodePtr prepareQueryAffectedQueryTree(const std::vector<MutationCommand
|
||||
ColumnDependencies getAllColumnDependencies(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const NameSet & updated_columns,
|
||||
const std::function<bool(const String & file_name)> & has_index_or_projection)
|
||||
const StorageInMemoryMetadata::HasDependencyCallback & has_dependency)
|
||||
{
|
||||
NameSet new_updated_columns = updated_columns;
|
||||
ColumnDependencies dependencies;
|
||||
|
||||
while (!new_updated_columns.empty())
|
||||
{
|
||||
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true, has_index_or_projection);
|
||||
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true, has_dependency);
|
||||
new_updated_columns.clear();
|
||||
for (const auto & dependency : new_dependencies)
|
||||
{
|
||||
@ -292,9 +293,14 @@ bool MutationsInterpreter::Source::materializeTTLRecalculateOnly() const
|
||||
return data && data->getSettings()->materialize_ttl_recalculate_only;
|
||||
}
|
||||
|
||||
bool MutationsInterpreter::Source::hasIndexOrProjection(const String & file_name) const
|
||||
bool MutationsInterpreter::Source::hasSecondaryIndex(const String & name) const
|
||||
{
|
||||
return part && part->checksums.has(file_name);
|
||||
return part && part->hasSecondaryIndex(name);
|
||||
}
|
||||
|
||||
bool MutationsInterpreter::Source::hasProjection(const String & name) const
|
||||
{
|
||||
return part && part->hasProjection(name);
|
||||
}
|
||||
|
||||
static Names getAvailableColumnsWithVirtuals(StorageMetadataPtr metadata_snapshot, const IStorage & storage)
|
||||
@ -533,13 +539,24 @@ void MutationsInterpreter::prepare(bool dry_run)
|
||||
validateUpdateColumns(source, metadata_snapshot, updated_columns, column_to_affected_materialized);
|
||||
}
|
||||
|
||||
std::function<bool(const String & file_name)> has_index_or_projection
|
||||
= [&](const String & file_name) { return source.hasIndexOrProjection(file_name); };
|
||||
StorageInMemoryMetadata::HasDependencyCallback has_dependency =
|
||||
[&](const String & name, ColumnDependency::Kind kind)
|
||||
{
|
||||
if (kind == ColumnDependency::PROJECTION)
|
||||
return source.hasProjection(name);
|
||||
|
||||
if (kind == ColumnDependency::SKIP_INDEX)
|
||||
return source.hasSecondaryIndex(name);
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
if (settings.recalculate_dependencies_of_updated_columns)
|
||||
dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns, has_index_or_projection);
|
||||
dependencies = getAllColumnDependencies(metadata_snapshot, updated_columns, has_dependency);
|
||||
|
||||
bool has_alter_delete = false;
|
||||
std::vector<String> read_columns;
|
||||
|
||||
/// First, break a sequence of commands into stages.
|
||||
for (auto & command : commands)
|
||||
{
|
||||
@ -558,6 +575,7 @@ void MutationsInterpreter::prepare(bool dry_run)
|
||||
predicate = makeASTFunction("isZeroOrNull", predicate);
|
||||
|
||||
stages.back().filters.push_back(predicate);
|
||||
has_alter_delete = true;
|
||||
}
|
||||
else if (command.type == MutationCommand::UPDATE)
|
||||
{
|
||||
@ -692,8 +710,7 @@ void MutationsInterpreter::prepare(bool dry_run)
|
||||
if (it == std::cend(indices_desc))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown index: {}", command.index_name);
|
||||
|
||||
if (!source.hasIndexOrProjection("skp_idx_" + it->name + ".idx")
|
||||
&& !source.hasIndexOrProjection("skp_idx_" + it->name + ".idx2"))
|
||||
if (!source.hasSecondaryIndex(it->name))
|
||||
{
|
||||
auto query = (*it).expression_list_ast->clone();
|
||||
auto syntax_result = TreeRewriter(context).analyze(query, all_columns);
|
||||
@ -707,7 +724,7 @@ void MutationsInterpreter::prepare(bool dry_run)
|
||||
{
|
||||
mutation_kind.set(MutationKind::MUTATE_INDEX_PROJECTION);
|
||||
const auto & projection = projections_desc.get(command.projection_name);
|
||||
if (!source.hasIndexOrProjection(projection.getDirectoryName()))
|
||||
if (!source.hasProjection(projection.name))
|
||||
{
|
||||
for (const auto & column : projection.required_columns)
|
||||
dependencies.emplace(column, ColumnDependency::PROJECTION);
|
||||
@ -731,8 +748,9 @@ void MutationsInterpreter::prepare(bool dry_run)
|
||||
{
|
||||
// just recalculate ttl_infos without remove expired data
|
||||
auto all_columns_vec = all_columns.getNames();
|
||||
auto new_dependencies = metadata_snapshot->getColumnDependencies(
|
||||
NameSet(all_columns_vec.begin(), all_columns_vec.end()), false, has_index_or_projection);
|
||||
auto all_columns_set = NameSet(all_columns_vec.begin(), all_columns_vec.end());
|
||||
auto new_dependencies = metadata_snapshot->getColumnDependencies(all_columns_set, false, has_dependency);
|
||||
|
||||
for (const auto & dependency : new_dependencies)
|
||||
{
|
||||
if (dependency.kind == ColumnDependency::TTL_EXPRESSION)
|
||||
@ -757,8 +775,8 @@ void MutationsInterpreter::prepare(bool dry_run)
|
||||
}
|
||||
|
||||
auto all_columns_vec = all_columns.getNames();
|
||||
auto all_dependencies = getAllColumnDependencies(
|
||||
metadata_snapshot, NameSet(all_columns_vec.begin(), all_columns_vec.end()), has_index_or_projection);
|
||||
auto all_columns_set = NameSet(all_columns_vec.begin(), all_columns_vec.end());
|
||||
auto all_dependencies = getAllColumnDependencies(metadata_snapshot, all_columns_set, has_dependency);
|
||||
|
||||
for (const auto & dependency : all_dependencies)
|
||||
{
|
||||
@ -767,7 +785,7 @@ void MutationsInterpreter::prepare(bool dry_run)
|
||||
}
|
||||
|
||||
/// Recalc only skip indices and projections of columns which could be updated by TTL.
|
||||
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true, has_index_or_projection);
|
||||
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true, has_dependency);
|
||||
for (const auto & dependency : new_dependencies)
|
||||
{
|
||||
if (dependency.kind == ColumnDependency::SKIP_INDEX || dependency.kind == ColumnDependency::PROJECTION)
|
||||
@ -861,30 +879,44 @@ void MutationsInterpreter::prepare(bool dry_run)
|
||||
|
||||
for (const auto & index : metadata_snapshot->getSecondaryIndices())
|
||||
{
|
||||
if (source.hasIndexOrProjection("skp_idx_" + index.name + ".idx") || source.hasIndexOrProjection("skp_idx_" + index.name + ".idx2"))
|
||||
if (!source.hasSecondaryIndex(index.name))
|
||||
continue;
|
||||
|
||||
if (has_alter_delete)
|
||||
{
|
||||
const auto & index_cols = index.expression->getRequiredColumns();
|
||||
bool changed = std::any_of(
|
||||
index_cols.begin(),
|
||||
index_cols.end(),
|
||||
[&](const auto & col) { return updated_columns.contains(col) || changed_columns.contains(col); });
|
||||
if (changed)
|
||||
materialized_indices.insert(index.name);
|
||||
materialized_indices.insert(index.name);
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto & index_cols = index.expression->getRequiredColumns();
|
||||
bool changed = std::any_of(
|
||||
index_cols.begin(),
|
||||
index_cols.end(),
|
||||
[&](const auto & col) { return updated_columns.contains(col) || changed_columns.contains(col); });
|
||||
|
||||
if (changed)
|
||||
materialized_indices.insert(index.name);
|
||||
}
|
||||
|
||||
for (const auto & projection : metadata_snapshot->getProjections())
|
||||
{
|
||||
if (source.hasIndexOrProjection(projection.getDirectoryName()))
|
||||
if (!source.hasProjection(projection.name))
|
||||
continue;
|
||||
|
||||
if (has_alter_delete)
|
||||
{
|
||||
const auto & projection_cols = projection.required_columns;
|
||||
bool changed = std::any_of(
|
||||
projection_cols.begin(),
|
||||
projection_cols.end(),
|
||||
[&](const auto & col) { return updated_columns.contains(col) || changed_columns.contains(col); });
|
||||
if (changed)
|
||||
materialized_projections.insert(projection.name);
|
||||
materialized_projections.insert(projection.name);
|
||||
continue;
|
||||
}
|
||||
|
||||
const auto & projection_cols = projection.required_columns;
|
||||
bool changed = std::any_of(
|
||||
projection_cols.begin(),
|
||||
projection_cols.end(),
|
||||
[&](const auto & col) { return updated_columns.contains(col) || changed_columns.contains(col); });
|
||||
|
||||
if (changed)
|
||||
materialized_projections.insert(projection.name);
|
||||
}
|
||||
|
||||
/// Stages might be empty when we materialize skip indices or projections which don't add any
|
||||
|
@ -120,7 +120,8 @@ public:
|
||||
bool supportsLightweightDelete() const;
|
||||
bool hasLightweightDeleteMask() const;
|
||||
bool materializeTTLRecalculateOnly() const;
|
||||
bool hasIndexOrProjection(const String & file_name) const;
|
||||
bool hasSecondaryIndex(const String & name) const;
|
||||
bool hasProjection(const String & name) const;
|
||||
|
||||
void read(
|
||||
Stage & first_stage,
|
||||
|
@ -520,8 +520,6 @@ ContextMutablePtr Session::makeSessionContext()
|
||||
{},
|
||||
session_context->getSettingsRef().max_sessions_for_user);
|
||||
|
||||
recordLoginSucess(session_context);
|
||||
|
||||
return session_context;
|
||||
}
|
||||
|
||||
@ -584,8 +582,6 @@ ContextMutablePtr Session::makeSessionContext(const String & session_name_, std:
|
||||
{ session_name_ },
|
||||
max_sessions_for_user);
|
||||
|
||||
recordLoginSucess(session_context);
|
||||
|
||||
return session_context;
|
||||
}
|
||||
|
||||
@ -659,35 +655,21 @@ ContextMutablePtr Session::makeQueryContextImpl(const ClientInfo * client_info_t
|
||||
if (user_id)
|
||||
user = query_context->getUser();
|
||||
|
||||
/// Interserver does not create session context
|
||||
recordLoginSucess(query_context);
|
||||
|
||||
return query_context;
|
||||
}
|
||||
|
||||
|
||||
void Session::recordLoginSucess(ContextPtr login_context) const
|
||||
{
|
||||
if (notified_session_log_about_login)
|
||||
return;
|
||||
|
||||
if (!login_context)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Session or query context must be created");
|
||||
|
||||
if (auto session_log = getSessionLog())
|
||||
if (!notified_session_log_about_login)
|
||||
{
|
||||
const auto & settings = login_context->getSettingsRef();
|
||||
const auto access = login_context->getAccess();
|
||||
if (auto session_log = getSessionLog())
|
||||
{
|
||||
session_log->addLoginSuccess(
|
||||
auth_id,
|
||||
named_session ? std::optional<std::string>(named_session->key.second) : std::nullopt,
|
||||
*query_context,
|
||||
user);
|
||||
|
||||
session_log->addLoginSuccess(auth_id,
|
||||
named_session ? named_session->key.second : "",
|
||||
settings,
|
||||
access,
|
||||
getClientInfo(),
|
||||
user);
|
||||
notified_session_log_about_login = true;
|
||||
}
|
||||
}
|
||||
|
||||
notified_session_log_about_login = true;
|
||||
return query_context;
|
||||
}
|
||||
|
||||
|
||||
|
@ -97,8 +97,6 @@ public:
|
||||
private:
|
||||
std::shared_ptr<SessionLog> getSessionLog() const;
|
||||
ContextMutablePtr makeQueryContextImpl(const ClientInfo * client_info_to_copy, ClientInfo * client_info_to_move) const;
|
||||
void recordLoginSucess(ContextPtr login_context) const;
|
||||
|
||||
|
||||
mutable bool notified_session_log_about_login = false;
|
||||
const UUID auth_id;
|
||||
|
@ -199,13 +199,12 @@ void SessionLogElement::appendToBlock(MutableColumns & columns) const
|
||||
columns[i++]->insertData(auth_failure_reason.data(), auth_failure_reason.length());
|
||||
}
|
||||
|
||||
void SessionLog::addLoginSuccess(const UUID & auth_id,
|
||||
const String & session_id,
|
||||
const Settings & settings,
|
||||
const ContextAccessPtr & access,
|
||||
const ClientInfo & client_info,
|
||||
const UserPtr & login_user)
|
||||
void SessionLog::addLoginSuccess(const UUID & auth_id, std::optional<String> session_id, const Context & login_context, const UserPtr & login_user)
|
||||
{
|
||||
const auto access = login_context.getAccess();
|
||||
const auto & settings = login_context.getSettingsRef();
|
||||
const auto & client_info = login_context.getClientInfo();
|
||||
|
||||
DB::SessionLogElement log_entry(auth_id, SESSION_LOGIN_SUCCESS);
|
||||
log_entry.client_info = client_info;
|
||||
|
||||
@ -216,7 +215,8 @@ void SessionLog::addLoginSuccess(const UUID & auth_id,
|
||||
}
|
||||
log_entry.external_auth_server = login_user ? login_user->auth_data.getLDAPServerName() : "";
|
||||
|
||||
log_entry.session_id = session_id;
|
||||
if (session_id)
|
||||
log_entry.session_id = *session_id;
|
||||
|
||||
if (const auto roles_info = access->getRolesInfo())
|
||||
log_entry.roles = roles_info->getCurrentRolesNames();
|
||||
|
@ -20,7 +20,6 @@ enum SessionLogElementType : int8_t
|
||||
class ContextAccess;
|
||||
struct User;
|
||||
using UserPtr = std::shared_ptr<const User>;
|
||||
using ContextAccessPtr = std::shared_ptr<const ContextAccess>;
|
||||
|
||||
/** A struct which will be inserted as row into session_log table.
|
||||
*
|
||||
@ -73,13 +72,7 @@ class SessionLog : public SystemLog<SessionLogElement>
|
||||
using SystemLog<SessionLogElement>::SystemLog;
|
||||
|
||||
public:
|
||||
void addLoginSuccess(const UUID & auth_id,
|
||||
const String & session_id,
|
||||
const Settings & settings,
|
||||
const ContextAccessPtr & access,
|
||||
const ClientInfo & client_info,
|
||||
const UserPtr & login_user);
|
||||
|
||||
void addLoginSuccess(const UUID & auth_id, std::optional<String> session_id, const Context & login_context, const UserPtr & login_user);
|
||||
void addLoginFailure(const UUID & auth_id, const ClientInfo & info, const std::optional<String> & user, const Exception & reason);
|
||||
void addLogOut(const UUID & auth_id, const UserPtr & login_user, const ClientInfo & client_info);
|
||||
};
|
||||
|
@ -75,7 +75,7 @@ std::unique_ptr<QueryPlan> createLocalPlan(
|
||||
new_context->setClientInterface(ClientInfo::Interface::LOCAL);
|
||||
new_context->setQueryKind(ClientInfo::QueryKind::SECONDARY_QUERY);
|
||||
new_context->setReplicaInfo(true, replica_count, replica_num);
|
||||
new_context->setConnectionClientVersion(DBMS_VERSION_MAJOR, DBMS_VERSION_MINOR, DBMS_VERSION_PATCH, DBMS_TCP_PROTOCOL_VERSION);
|
||||
new_context->setConnectionClientVersion(VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH, DBMS_TCP_PROTOCOL_VERSION);
|
||||
new_context->setParallelReplicasGroupUUID(group_uuid);
|
||||
new_context->setMergeTreeAllRangesCallback([coordinator](InitialAllRangesAnnouncement announcement)
|
||||
{
|
||||
|
@ -561,7 +561,8 @@ void HTTPHandler::processQuery(
|
||||
session->makeSessionContext();
|
||||
}
|
||||
|
||||
auto context = session->makeQueryContext();
|
||||
auto client_info = session->getClientInfo();
|
||||
auto context = session->makeQueryContext(std::move(client_info));
|
||||
|
||||
/// This parameter is used to tune the behavior of output formats (such as Native) for compatibility.
|
||||
if (params.has("client_protocol_version"))
|
||||
|
@ -132,21 +132,25 @@ void addCommonDefaultHandlersFactory(HTTPRequestHandlerFactoryMain & factory, IS
|
||||
auto ping_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<StaticRequestHandler>>(server, ping_response_expression);
|
||||
ping_handler->attachStrictPath("/ping");
|
||||
ping_handler->allowGetAndHeadRequest();
|
||||
factory.addPathToHints("/ping");
|
||||
factory.addHandler(ping_handler);
|
||||
|
||||
auto replicas_status_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<ReplicasStatusHandler>>(server);
|
||||
replicas_status_handler->attachNonStrictPath("/replicas_status");
|
||||
replicas_status_handler->allowGetAndHeadRequest();
|
||||
factory.addPathToHints("/replicas_status");
|
||||
factory.addHandler(replicas_status_handler);
|
||||
|
||||
auto play_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<WebUIRequestHandler>>(server);
|
||||
play_handler->attachNonStrictPath("/play");
|
||||
play_handler->allowGetAndHeadRequest();
|
||||
factory.addPathToHints("/play");
|
||||
factory.addHandler(play_handler);
|
||||
|
||||
auto dashboard_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<WebUIRequestHandler>>(server);
|
||||
dashboard_handler->attachNonStrictPath("/dashboard");
|
||||
dashboard_handler->allowGetAndHeadRequest();
|
||||
factory.addPathToHints("/dashboard");
|
||||
factory.addHandler(dashboard_handler);
|
||||
|
||||
auto js_handler = std::make_shared<HandlingRuleHTTPHandlerFactory<WebUIRequestHandler>>(server);
|
||||
|
16
src/Server/HTTPPathHints.cpp
Normal file
16
src/Server/HTTPPathHints.cpp
Normal file
@ -0,0 +1,16 @@
|
||||
#include <Server/HTTPPathHints.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void HTTPPathHints::add(const String & http_path)
|
||||
{
|
||||
http_paths.push_back(http_path);
|
||||
}
|
||||
|
||||
std::vector<String> HTTPPathHints::getAllRegisteredNames() const
|
||||
{
|
||||
return http_paths;
|
||||
}
|
||||
|
||||
}
|
22
src/Server/HTTPPathHints.h
Normal file
22
src/Server/HTTPPathHints.h
Normal file
@ -0,0 +1,22 @@
|
||||
#pragma once
|
||||
|
||||
#include <base/types.h>
|
||||
|
||||
#include <Common/NamePrompter.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class HTTPPathHints : public IHints<1, HTTPPathHints>
|
||||
{
|
||||
public:
|
||||
std::vector<String> getAllRegisteredNames() const override;
|
||||
void add(const String & http_path);
|
||||
|
||||
private:
|
||||
std::vector<String> http_paths;
|
||||
};
|
||||
|
||||
using HTTPPathHintsPtr = std::shared_ptr<HTTPPathHints>;
|
||||
|
||||
}
|
@ -29,7 +29,7 @@ std::unique_ptr<HTTPRequestHandler> HTTPRequestHandlerFactoryMain::createRequest
|
||||
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD
|
||||
|| request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
|
||||
{
|
||||
return std::unique_ptr<HTTPRequestHandler>(new NotFoundHandler);
|
||||
return std::unique_ptr<HTTPRequestHandler>(new NotFoundHandler(hints.getHints(request.getURI())));
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Server/HTTP/HTTPRequestHandlerFactory.h>
|
||||
#include <Server/HTTPPathHints.h>
|
||||
|
||||
#include <vector>
|
||||
|
||||
@ -15,11 +16,14 @@ public:
|
||||
|
||||
void addHandler(HTTPRequestHandlerFactoryPtr child_factory) { child_factories.emplace_back(child_factory); }
|
||||
|
||||
void addPathToHints(const std::string & http_path) { hints.add(http_path); }
|
||||
|
||||
std::unique_ptr<HTTPRequestHandler> createRequestHandler(const HTTPServerRequest & request) override;
|
||||
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
std::string name;
|
||||
HTTPPathHints hints;
|
||||
|
||||
std::vector<HTTPRequestHandlerFactoryPtr> child_factories;
|
||||
};
|
||||
|
@ -10,7 +10,8 @@ void NotFoundHandler::handleRequest(HTTPServerRequest & request, HTTPServerRespo
|
||||
try
|
||||
{
|
||||
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_NOT_FOUND);
|
||||
*response.send() << "There is no handle " << request.getURI() << "\n\n"
|
||||
*response.send() << "There is no handle " << request.getURI()
|
||||
<< (!hints.empty() ? fmt::format(". Maybe you meant {}.", hints.front()) : "") << "\n\n"
|
||||
<< "Use / or /ping for health checks.\n"
|
||||
<< "Or /replicas_status for more sophisticated health checks.\n\n"
|
||||
<< "Send queries from your program with POST method or GET /?query=...\n\n"
|
||||
|
@ -9,7 +9,10 @@ namespace DB
|
||||
class NotFoundHandler : public HTTPRequestHandler
|
||||
{
|
||||
public:
|
||||
NotFoundHandler(std::vector<std::string> hints_) : hints(std::move(hints_)) {}
|
||||
void handleRequest(HTTPServerRequest & request, HTTPServerResponse & response) override;
|
||||
private:
|
||||
std::vector<std::string> hints;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -2,10 +2,10 @@
|
||||
|
||||
#include <vector>
|
||||
#include <algorithm>
|
||||
#include <base/types.h>
|
||||
|
||||
#include <magic_enum.hpp>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
|
@ -1,13 +1,13 @@
|
||||
#pragma once
|
||||
|
||||
#include <base/types.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class ServerType
|
||||
{
|
||||
public:
|
||||
|
||||
enum Type
|
||||
{
|
||||
TCP,
|
||||
|
@ -1303,16 +1303,16 @@ void TCPHandler::receiveUnexpectedHello()
|
||||
void TCPHandler::sendHello()
|
||||
{
|
||||
writeVarUInt(Protocol::Server::Hello, *out);
|
||||
writeStringBinary(DBMS_NAME, *out);
|
||||
writeVarUInt(DBMS_VERSION_MAJOR, *out);
|
||||
writeVarUInt(DBMS_VERSION_MINOR, *out);
|
||||
writeStringBinary(VERSION_NAME, *out);
|
||||
writeVarUInt(VERSION_MAJOR, *out);
|
||||
writeVarUInt(VERSION_MINOR, *out);
|
||||
writeVarUInt(DBMS_TCP_PROTOCOL_VERSION, *out);
|
||||
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_TIMEZONE)
|
||||
writeStringBinary(DateLUT::instance().getTimeZone(), *out);
|
||||
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME)
|
||||
writeStringBinary(server_display_name, *out);
|
||||
if (client_tcp_protocol_version >= DBMS_MIN_REVISION_WITH_VERSION_PATCH)
|
||||
writeVarUInt(DBMS_VERSION_PATCH, *out);
|
||||
writeVarUInt(VERSION_PATCH, *out);
|
||||
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PASSWORD_COMPLEXITY_RULES)
|
||||
{
|
||||
auto rules = server.context()->getAccessControl().getPasswordComplexityRules();
|
||||
|
@ -1983,6 +1983,12 @@ IndexSize IMergeTreeDataPart::getSecondaryIndexSize(const String & secondary_ind
|
||||
return ColumnSize{};
|
||||
}
|
||||
|
||||
bool IMergeTreeDataPart::hasSecondaryIndex(const String & index_name) const
|
||||
{
|
||||
auto file_name = INDEX_FILE_PREFIX + index_name;
|
||||
return checksums.has(file_name + ".idx") || checksums.has(file_name + ".idx2");
|
||||
}
|
||||
|
||||
void IMergeTreeDataPart::accumulateColumnSizes(ColumnToSize & column_to_size) const
|
||||
{
|
||||
for (const auto & [column_name, size] : columns_sizes)
|
||||
|
@ -122,6 +122,9 @@ public:
|
||||
/// Otherwise return information about secondary index size on disk.
|
||||
IndexSize getSecondaryIndexSize(const String & secondary_index_name) const;
|
||||
|
||||
/// Returns true if there is materialized index with specified name in part.
|
||||
bool hasSecondaryIndex(const String & index_name) const;
|
||||
|
||||
/// Return information about column size on disk for all columns in part
|
||||
ColumnSize getTotalColumnsSize() const { return total_columns_size; }
|
||||
|
||||
|
@ -453,6 +453,7 @@ static ExecuteTTLType shouldExecuteTTL(const StorageMetadataPtr & metadata_snaps
|
||||
/// Return set of indices which should be recalculated during mutation also
|
||||
/// wraps input stream into additional expression stream
|
||||
static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
|
||||
const MergeTreeDataPartPtr & source_part,
|
||||
QueryPipelineBuilder & builder,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
ContextPtr context,
|
||||
@ -463,10 +464,15 @@ static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
|
||||
std::set<MergeTreeIndexPtr> indices_to_recalc;
|
||||
ASTPtr indices_recalc_expr_list = std::make_shared<ASTExpressionList>();
|
||||
const auto & indices = metadata_snapshot->getSecondaryIndices();
|
||||
bool is_full_part_storage = isFullPartStorage(source_part->getDataPartStorage());
|
||||
|
||||
for (const auto & index : indices)
|
||||
{
|
||||
if (materialized_indices.contains(index.name))
|
||||
bool need_recalculate =
|
||||
materialized_indices.contains(index.name)
|
||||
|| (!is_full_part_storage && source_part->hasSecondaryIndex(index.name));
|
||||
|
||||
if (need_recalculate)
|
||||
{
|
||||
if (indices_to_recalc.insert(index_factory.get(index)).second)
|
||||
{
|
||||
@ -496,15 +502,23 @@ static std::set<MergeTreeIndexPtr> getIndicesToRecalculate(
|
||||
}
|
||||
|
||||
static std::set<ProjectionDescriptionRawPtr> getProjectionsToRecalculate(
|
||||
const MergeTreeDataPartPtr & source_part,
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
const NameSet & materialized_projections)
|
||||
{
|
||||
std::set<ProjectionDescriptionRawPtr> projections_to_recalc;
|
||||
bool is_full_part_storage = isFullPartStorage(source_part->getDataPartStorage());
|
||||
|
||||
for (const auto & projection : metadata_snapshot->getProjections())
|
||||
{
|
||||
if (materialized_projections.contains(projection.name))
|
||||
bool need_recalculate =
|
||||
materialized_projections.contains(projection.name)
|
||||
|| (!is_full_part_storage && source_part->hasProjection(projection.name));
|
||||
|
||||
if (need_recalculate)
|
||||
projections_to_recalc.insert(&projection);
|
||||
}
|
||||
|
||||
return projections_to_recalc;
|
||||
}
|
||||
|
||||
@ -1279,14 +1293,20 @@ private:
|
||||
removed_indices.insert(command.column_name);
|
||||
}
|
||||
|
||||
bool is_full_part_storage = isFullPartStorage(ctx->new_data_part->getDataPartStorage());
|
||||
const auto & indices = ctx->metadata_snapshot->getSecondaryIndices();
|
||||
|
||||
MergeTreeIndices skip_indices;
|
||||
for (const auto & idx : indices)
|
||||
{
|
||||
if (removed_indices.contains(idx.name))
|
||||
continue;
|
||||
|
||||
if (ctx->materialized_indices.contains(idx.name))
|
||||
bool need_recalculate =
|
||||
ctx->materialized_indices.contains(idx.name)
|
||||
|| (!is_full_part_storage && ctx->source_part->hasSecondaryIndex(idx.name));
|
||||
|
||||
if (need_recalculate)
|
||||
{
|
||||
skip_indices.push_back(MergeTreeIndexFactory::instance().get(idx));
|
||||
}
|
||||
@ -1319,7 +1339,11 @@ private:
|
||||
if (removed_projections.contains(projection.name))
|
||||
continue;
|
||||
|
||||
if (ctx->materialized_projections.contains(projection.name))
|
||||
bool need_recalculate =
|
||||
ctx->materialized_projections.contains(projection.name)
|
||||
|| (!is_full_part_storage && ctx->source_part->hasProjection(projection.name));
|
||||
|
||||
if (need_recalculate)
|
||||
{
|
||||
ctx->projections_to_build.push_back(&projection);
|
||||
}
|
||||
@ -1921,9 +1945,16 @@ bool MutateTask::prepare()
|
||||
else /// TODO: check that we modify only non-key columns in this case.
|
||||
{
|
||||
ctx->indices_to_recalc = MutationHelpers::getIndicesToRecalculate(
|
||||
ctx->mutating_pipeline_builder, ctx->metadata_snapshot, ctx->context, ctx->materialized_indices);
|
||||
ctx->source_part,
|
||||
ctx->mutating_pipeline_builder,
|
||||
ctx->metadata_snapshot,
|
||||
ctx->context,
|
||||
ctx->materialized_indices);
|
||||
|
||||
ctx->projections_to_recalc = MutationHelpers::getProjectionsToRecalculate(ctx->metadata_snapshot, ctx->materialized_projections);
|
||||
ctx->projections_to_recalc = MutationHelpers::getProjectionsToRecalculate(
|
||||
ctx->source_part,
|
||||
ctx->metadata_snapshot,
|
||||
ctx->materialized_projections);
|
||||
|
||||
ctx->files_to_skip = MutationHelpers::collectFilesToSkip(
|
||||
ctx->source_part,
|
||||
|
@ -239,7 +239,7 @@ bool StorageInMemoryMetadata::hasAnyGroupByTTL() const
|
||||
ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(
|
||||
const NameSet & updated_columns,
|
||||
bool include_ttl_target,
|
||||
const std::function<bool(const String & file_name)> & has_indice_or_projection) const
|
||||
const HasDependencyCallback & has_dependency) const
|
||||
{
|
||||
if (updated_columns.empty())
|
||||
return {};
|
||||
@ -268,13 +268,13 @@ ColumnDependencies StorageInMemoryMetadata::getColumnDependencies(
|
||||
|
||||
for (const auto & index : getSecondaryIndices())
|
||||
{
|
||||
if (has_indice_or_projection("skp_idx_" + index.name + ".idx") || has_indice_or_projection("skp_idx_" + index.name + ".idx2"))
|
||||
if (has_dependency(index.name, ColumnDependency::SKIP_INDEX))
|
||||
add_dependent_columns(index.expression, indices_columns);
|
||||
}
|
||||
|
||||
for (const auto & projection : getProjections())
|
||||
{
|
||||
if (has_indice_or_projection(projection.getDirectoryName()))
|
||||
if (has_dependency(projection.name, ColumnDependency::PROJECTION))
|
||||
add_dependent_columns(&projection, projections_columns);
|
||||
}
|
||||
|
||||
|
@ -147,12 +147,14 @@ struct StorageInMemoryMetadata
|
||||
TTLDescriptions getGroupByTTLs() const;
|
||||
bool hasAnyGroupByTTL() const;
|
||||
|
||||
using HasDependencyCallback = std::function<bool(const String &, ColumnDependency::Kind)>;
|
||||
|
||||
/// Returns columns, which will be needed to calculate dependencies (skip indices, projections,
|
||||
/// TTL expressions) if we update @updated_columns set of columns.
|
||||
ColumnDependencies getColumnDependencies(
|
||||
const NameSet & updated_columns,
|
||||
bool include_ttl_target,
|
||||
const std::function<bool(const String & file_name)> & has_indice_or_projection) const;
|
||||
const HasDependencyCallback & has_dependency) const;
|
||||
|
||||
/// Block with ordinary + materialized columns.
|
||||
Block getSampleBlock() const;
|
||||
|
@ -8,7 +8,6 @@ const char * auto_config_build[]
|
||||
"SYSTEM", "@CMAKE_SYSTEM_NAME@",
|
||||
"VERSION_GITHASH", "@VERSION_GITHASH@",
|
||||
"VERSION_REVISION", "@VERSION_REVISION@",
|
||||
"VERSION_DATE", "@VERSION_DATE@",
|
||||
"BUILD_TYPE", "@CMAKE_BUILD_TYPE@",
|
||||
"SYSTEM_PROCESSOR", "@CMAKE_SYSTEM_PROCESSOR@",
|
||||
"CMAKE_VERSION", "@CMAKE_VERSION@",
|
||||
|
@ -139,6 +139,7 @@ test_quota/test.py::test_tracking_quota
|
||||
test_quota/test.py::test_users_xml_is_readonly
|
||||
test_replicating_constants/test.py::test_different_versions
|
||||
test_merge_tree_s3/test.py::test_heavy_insert_select_check_memory[node]
|
||||
test_wrong_db_or_table_name/test.py::test_wrong_table_name
|
||||
test_drop_is_lock_free/test.py::test_query_is_lock_free[detach table]
|
||||
test_odbc_interaction/test.py::test_postgres_insert
|
||||
test_zookeeper_config/test.py::test_chroot_with_different_root
|
||||
|
@ -10,6 +10,7 @@ import sys
|
||||
import time
|
||||
|
||||
from ci_config import CI_CONFIG, BuildConfig
|
||||
from ccache_utils import CargoCache
|
||||
from docker_pull_helper import get_image_with_version
|
||||
from env_helper import (
|
||||
GITHUB_JOB,
|
||||
@ -53,6 +54,7 @@ def get_packager_cmd(
|
||||
build_config: BuildConfig,
|
||||
packager_path: str,
|
||||
output_path: Path,
|
||||
cargo_cache_dir: Path,
|
||||
build_version: str,
|
||||
image_version: str,
|
||||
official: bool,
|
||||
@ -75,6 +77,7 @@ def get_packager_cmd(
|
||||
cmd += " --cache=sccache"
|
||||
cmd += " --s3-rw-access"
|
||||
cmd += f" --s3-bucket={S3_BUILDS_BUCKET}"
|
||||
cmd += f" --cargo-cache-dir={cargo_cache_dir}"
|
||||
|
||||
if build_config.additional_pkgs:
|
||||
cmd += " --additional-pkgs"
|
||||
@ -287,11 +290,16 @@ def main():
|
||||
|
||||
build_output_path = temp_path / build_name
|
||||
os.makedirs(build_output_path, exist_ok=True)
|
||||
cargo_cache = CargoCache(
|
||||
temp_path / "cargo_cache" / "registry", temp_path, s3_helper
|
||||
)
|
||||
cargo_cache.download()
|
||||
|
||||
packager_cmd = get_packager_cmd(
|
||||
build_config,
|
||||
os.path.join(REPO_COPY, "docker/packager"),
|
||||
build_output_path,
|
||||
cargo_cache.directory,
|
||||
version.string,
|
||||
image_version,
|
||||
official_flag,
|
||||
@ -309,6 +317,9 @@ def main():
|
||||
f"sudo chown -R ubuntu:ubuntu {build_output_path}", shell=True
|
||||
)
|
||||
logging.info("Build finished with %s, log path %s", success, log_path)
|
||||
if success:
|
||||
cargo_cache.upload()
|
||||
|
||||
if not success:
|
||||
# We check if docker works, because if it's down, it's infrastructure
|
||||
try:
|
||||
|
@ -16,6 +16,10 @@ from ci_config import CI_CONFIG
|
||||
DOWNLOAD_RETRIES_COUNT = 5
|
||||
|
||||
|
||||
class DownloadException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def get_with_retries(
|
||||
url: str,
|
||||
retries: int = DOWNLOAD_RETRIES_COUNT,
|
||||
@ -149,7 +153,9 @@ def download_build_with_progress(url: str, path: Path) -> None:
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
else:
|
||||
raise Exception(f"Cannot download dataset from {url}, all retries exceeded")
|
||||
raise DownloadException(
|
||||
f"Cannot download dataset from {url}, all retries exceeded"
|
||||
)
|
||||
|
||||
if sys.stdout.isatty():
|
||||
sys.stdout.write("\n")
|
||||
@ -174,7 +180,7 @@ def download_builds_filter(
|
||||
print(urls)
|
||||
|
||||
if not urls:
|
||||
raise Exception("No build URLs found")
|
||||
raise DownloadException("No build URLs found")
|
||||
|
||||
download_builds(result_path, urls, filter_fn)
|
||||
|
||||
|
@ -1,71 +1,31 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import logging
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import shutil
|
||||
from hashlib import md5
|
||||
from pathlib import Path
|
||||
|
||||
import requests # type: ignore
|
||||
|
||||
from build_download_helper import download_build_with_progress, DownloadException
|
||||
from compress_files import decompress_fast, compress_fast
|
||||
from env_helper import S3_DOWNLOAD, S3_BUILDS_BUCKET
|
||||
from git_helper import git_runner
|
||||
from s3_helper import S3Helper
|
||||
|
||||
DOWNLOAD_RETRIES_COUNT = 5
|
||||
|
||||
|
||||
def dowload_file_with_progress(url, path):
|
||||
logging.info("Downloading from %s to temp path %s", url, path)
|
||||
for i in range(DOWNLOAD_RETRIES_COUNT):
|
||||
try:
|
||||
with open(path, "wb") as f:
|
||||
response = requests.get(url, stream=True)
|
||||
response.raise_for_status()
|
||||
total_length = response.headers.get("content-length")
|
||||
if total_length is None or int(total_length) == 0:
|
||||
logging.info(
|
||||
"No content-length, will download file without progress"
|
||||
)
|
||||
f.write(response.content)
|
||||
else:
|
||||
dl = 0
|
||||
total_length = int(total_length)
|
||||
logging.info("Content length is %ld bytes", total_length)
|
||||
for data in response.iter_content(chunk_size=4096):
|
||||
dl += len(data)
|
||||
f.write(data)
|
||||
if sys.stdout.isatty():
|
||||
done = int(50 * dl / total_length)
|
||||
percent = int(100 * float(dl) / total_length)
|
||||
eq_str = "=" * done
|
||||
space_str = " " * (50 - done)
|
||||
sys.stdout.write(f"\r[{eq_str}{space_str}] {percent}%")
|
||||
sys.stdout.flush()
|
||||
break
|
||||
except Exception as ex:
|
||||
sys.stdout.write("\n")
|
||||
time.sleep(3)
|
||||
logging.info("Exception while downloading %s, retry %s", ex, i + 1)
|
||||
if os.path.exists(path):
|
||||
os.remove(path)
|
||||
else:
|
||||
raise Exception(f"Cannot download dataset from {url}, all retries exceeded")
|
||||
|
||||
sys.stdout.write("\n")
|
||||
logging.info("Downloading finished")
|
||||
|
||||
|
||||
def get_ccache_if_not_exists(
|
||||
path_to_ccache_dir: str,
|
||||
path_to_ccache_dir: Path,
|
||||
s3_helper: S3Helper,
|
||||
current_pr_number: int,
|
||||
temp_path: str,
|
||||
temp_path: Path,
|
||||
release_pr: int,
|
||||
) -> int:
|
||||
"""returns: number of PR for downloaded PR. -1 if ccache not found"""
|
||||
ccache_name = os.path.basename(path_to_ccache_dir)
|
||||
ccache_name = path_to_ccache_dir.name
|
||||
cache_found = False
|
||||
prs_to_check = [current_pr_number]
|
||||
# Release PR is either 0 or defined
|
||||
@ -94,11 +54,11 @@ def get_ccache_if_not_exists(
|
||||
|
||||
logging.info("Found ccache on path %s", obj)
|
||||
url = f"{S3_DOWNLOAD}/{S3_BUILDS_BUCKET}/{obj}"
|
||||
compressed_cache = os.path.join(temp_path, os.path.basename(obj))
|
||||
dowload_file_with_progress(url, compressed_cache)
|
||||
compressed_cache = temp_path / os.path.basename(obj)
|
||||
download_build_with_progress(url, compressed_cache)
|
||||
|
||||
path_to_decompress = str(Path(path_to_ccache_dir).parent)
|
||||
if not os.path.exists(path_to_decompress):
|
||||
path_to_decompress = path_to_ccache_dir.parent
|
||||
if not path_to_decompress.exists():
|
||||
os.makedirs(path_to_decompress)
|
||||
|
||||
if os.path.exists(path_to_ccache_dir):
|
||||
@ -122,15 +82,77 @@ def get_ccache_if_not_exists(
|
||||
return ccache_pr
|
||||
|
||||
|
||||
def upload_ccache(path_to_ccache_dir, s3_helper, current_pr_number, temp_path):
|
||||
def upload_ccache(
|
||||
path_to_ccache_dir: Path,
|
||||
s3_helper: S3Helper,
|
||||
current_pr_number: int,
|
||||
temp_path: Path,
|
||||
) -> None:
|
||||
logging.info("Uploading cache %s for pr %s", path_to_ccache_dir, current_pr_number)
|
||||
ccache_name = os.path.basename(path_to_ccache_dir)
|
||||
compressed_cache_path = os.path.join(temp_path, ccache_name + ".tar.zst")
|
||||
ccache_name = path_to_ccache_dir.name
|
||||
compressed_cache_path = temp_path / f"{ccache_name}.tar.zst"
|
||||
compress_fast(path_to_ccache_dir, compressed_cache_path)
|
||||
|
||||
s3_path = (
|
||||
str(current_pr_number) + "/ccaches/" + os.path.basename(compressed_cache_path)
|
||||
)
|
||||
s3_path = f"{current_pr_number}/ccaches/{compressed_cache_path.name}"
|
||||
logging.info("Will upload %s to path %s", compressed_cache_path, s3_path)
|
||||
s3_helper.upload_build_file_to_s3(compressed_cache_path, s3_path)
|
||||
logging.info("Upload finished")
|
||||
|
||||
|
||||
class CargoCache:
|
||||
PREFIX = "ccache/cargo_cache"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
directory: Path,
|
||||
temp_path: Path,
|
||||
s3_helper: S3Helper,
|
||||
):
|
||||
self._cargo_lock_file = Path(git_runner.cwd) / "rust" / "Cargo.lock"
|
||||
self.lock_hash = md5(self._cargo_lock_file.read_bytes()).hexdigest()
|
||||
self.directory = directory
|
||||
self.archive_name = f"Cargo_cache_{self.lock_hash}.tar.zst"
|
||||
self.temp_path = temp_path
|
||||
self.s3_helper = s3_helper
|
||||
self._url = (
|
||||
f"{S3_DOWNLOAD}/{S3_BUILDS_BUCKET}/{self.PREFIX}/{self.archive_name}"
|
||||
)
|
||||
self._force_upload_cache = False
|
||||
|
||||
def download(self):
|
||||
logging.info("Searching rust cache for Cargo.lock md5 %s", self.lock_hash)
|
||||
compressed_cache = self.temp_path / self.archive_name
|
||||
try:
|
||||
download_build_with_progress(self._url, compressed_cache)
|
||||
except DownloadException:
|
||||
logging.warning("Unable downloading cargo cache, creating empty directory")
|
||||
self.directory.mkdir(parents=True, exist_ok=True)
|
||||
return
|
||||
|
||||
# decompress the cache and check if the necessary directory is there
|
||||
self.directory.parent.mkdir(parents=True, exist_ok=True)
|
||||
decompress_fast(compressed_cache, self.directory.parent)
|
||||
if not self.directory.exists():
|
||||
logging.warning(
|
||||
"The cargo cache archive was successfully downloaded and "
|
||||
"decompressed, but %s does not exitst. Creating empty one",
|
||||
self.directory,
|
||||
)
|
||||
logging.info("Cache for Cargo.lock md5 %s will be uploaded", self.lock_hash)
|
||||
self.directory.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def upload(self):
|
||||
if not self._force_upload_cache:
|
||||
cache_response = requests.head(self._url)
|
||||
if cache_response.status_code == 200:
|
||||
logging.info(
|
||||
"Remote cargo cache %s already exist, won't reupload", self._url
|
||||
)
|
||||
return
|
||||
|
||||
logging.info("Compressing cargo cache")
|
||||
archive_path = self.directory.parent / self.archive_name
|
||||
compress_fast(self.directory, archive_path)
|
||||
s3_path = f"{self.PREFIX}/{self.archive_name}"
|
||||
logging.info("Uploading %s to S3 path %s", archive_path, s3_path)
|
||||
self.s3_helper.upload_build_file_to_s3(archive_path, s3_path)
|
||||
|
@ -1,24 +1,31 @@
|
||||
#!/usr/bin/env python3
|
||||
import subprocess
|
||||
import logging
|
||||
import os
|
||||
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
|
||||
def compress_file_fast(path, archive_path):
|
||||
if archive_path.endswith(".zst"):
|
||||
PIGZ = Path("/usr/bin/pigz")
|
||||
|
||||
|
||||
def compress_file_fast(path: Path, archive_path: Path) -> None:
|
||||
if archive_path.suffix == ".zst":
|
||||
subprocess.check_call(f"zstd < {path} > {archive_path}", shell=True)
|
||||
elif os.path.exists("/usr/bin/pigz"):
|
||||
elif PIGZ.exists():
|
||||
subprocess.check_call(f"pigz < {path} > {archive_path}", shell=True)
|
||||
else:
|
||||
subprocess.check_call(f"gzip < {path} > {archive_path}", shell=True)
|
||||
|
||||
|
||||
def compress_fast(path, archive_path, exclude=None):
|
||||
def compress_fast(
|
||||
path: Path, archive_path: Path, exclude: Optional[Path] = None
|
||||
) -> None:
|
||||
program_part = ""
|
||||
if archive_path.endswith(".zst"):
|
||||
if archive_path.suffix == ".zst":
|
||||
logging.info("zstd will be used for compression")
|
||||
program_part = "--use-compress-program='zstd --threads=0'"
|
||||
elif os.path.exists("/usr/bin/pigz"):
|
||||
elif PIGZ.exists():
|
||||
logging.info("pigz found, will compress and decompress faster")
|
||||
program_part = "--use-compress-program='pigz'"
|
||||
else:
|
||||
@ -32,27 +39,25 @@ def compress_fast(path, archive_path, exclude=None):
|
||||
else:
|
||||
exclude_part = f"--exclude {exclude}"
|
||||
|
||||
fname = os.path.basename(path)
|
||||
if os.path.isfile(path):
|
||||
path = os.path.dirname(path)
|
||||
else:
|
||||
path += "/.."
|
||||
fname = path.name
|
||||
|
||||
cmd = f"tar {program_part} {exclude_part} -cf {archive_path} -C {path} {fname}"
|
||||
cmd = (
|
||||
f"tar {program_part} {exclude_part} -cf {archive_path} -C {path.parent} {fname}"
|
||||
)
|
||||
logging.debug("compress_fast cmd: %s", cmd)
|
||||
subprocess.check_call(cmd, shell=True)
|
||||
|
||||
|
||||
def decompress_fast(archive_path, result_path=None):
|
||||
def decompress_fast(archive_path: Path, result_path: Optional[Path] = None) -> None:
|
||||
program_part = ""
|
||||
if archive_path.endswith(".zst"):
|
||||
if archive_path.suffix == ".zst":
|
||||
logging.info(
|
||||
"zstd will be used for decompression ('%s' -> '%s')",
|
||||
archive_path,
|
||||
result_path,
|
||||
)
|
||||
program_part = "--use-compress-program='zstd --threads=0'"
|
||||
elif os.path.exists("/usr/bin/pigz"):
|
||||
elif PIGZ.exists():
|
||||
logging.info(
|
||||
"pigz found, will compress and decompress faster ('%s' -> '%s')",
|
||||
archive_path,
|
||||
|
@ -8,7 +8,6 @@ import shutil
|
||||
import subprocess
|
||||
import time
|
||||
import sys
|
||||
from glob import glob
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Set, Tuple, Union
|
||||
|
||||
@ -32,17 +31,6 @@ TEMP_PATH = os.path.join(RUNNER_TEMP, "docker_images_check")
|
||||
ImagesDict = Dict[str, dict]
|
||||
|
||||
|
||||
# workaround for mypy issue [1]:
|
||||
#
|
||||
# "Argument 1 to "map" has incompatible type overloaded function" [1]
|
||||
#
|
||||
# [1]: https://github.com/python/mypy/issues/9864
|
||||
#
|
||||
# NOTE: simply lambda will do the trick as well, but pylint will not like it
|
||||
def realpath(*args, **kwargs):
|
||||
return os.path.realpath(*args, **kwargs)
|
||||
|
||||
|
||||
class DockerImage:
|
||||
def __init__(
|
||||
self,
|
||||
@ -123,23 +111,8 @@ def get_changed_docker_images(
|
||||
changed_images = []
|
||||
|
||||
for dockerfile_dir, image_description in images_dict.items():
|
||||
source_dir = GITHUB_WORKSPACE.rstrip("/") + "/"
|
||||
dockerfile_files = glob(f"{source_dir}/{dockerfile_dir}/**", recursive=True)
|
||||
# resolve symlinks
|
||||
dockerfile_files = list(map(realpath, dockerfile_files))
|
||||
# trim prefix to get relative path again, to match with files_changed
|
||||
dockerfile_files = list(map(lambda x: x[len(source_dir) :], dockerfile_files))
|
||||
logging.info(
|
||||
"Docker %s (source_dir=%s) build context for PR %s @ %s: %s",
|
||||
dockerfile_dir,
|
||||
source_dir,
|
||||
pr_info.number,
|
||||
pr_info.sha,
|
||||
str(dockerfile_files),
|
||||
)
|
||||
|
||||
for f in files_changed:
|
||||
if f in dockerfile_files:
|
||||
if f.startswith(dockerfile_dir):
|
||||
name = image_description["name"]
|
||||
only_amd64 = image_description.get("only_amd64", False)
|
||||
logging.info(
|
||||
@ -272,8 +245,6 @@ def build_and_push_one_image(
|
||||
cache_from = f"{cache_from} --cache-from type=registry,ref={image.repo}:{tag}"
|
||||
|
||||
cmd = (
|
||||
# tar is requried to follow symlinks, since docker-build cannot do this
|
||||
f"tar -v --exclude-vcs-ignores --show-transformed-names --transform 's#{image.full_path.lstrip('/')}#./#' --dereference --create {image.full_path} | "
|
||||
"docker buildx build --builder default "
|
||||
f"--label build-url={GITHUB_RUN_URL} "
|
||||
f"{from_tag_arg}"
|
||||
@ -283,7 +254,7 @@ def build_and_push_one_image(
|
||||
f"{cache_from} "
|
||||
f"--cache-to type=inline,mode=max "
|
||||
f"{push_arg}"
|
||||
f"--progress plain -"
|
||||
f"--progress plain {image.full_path}"
|
||||
)
|
||||
logging.info("Docker command to run: %s", cmd)
|
||||
with TeePopen(cmd, build_log) as proc:
|
||||
|
@ -126,13 +126,12 @@ class TestDockerImageCheck(unittest.TestCase):
|
||||
mock_popen.assert_called_once()
|
||||
mock_machine.assert_not_called()
|
||||
self.assertIn(
|
||||
"tar -v --exclude-vcs-ignores --show-transformed-names --transform 's#path#./#' --dereference --create path | "
|
||||
f"docker buildx build --builder default --label build-url={GITHUB_RUN_URL} "
|
||||
"--build-arg FROM_TAG=version "
|
||||
f"--build-arg CACHE_INVALIDATOR={GITHUB_RUN_URL} "
|
||||
"--tag name:version --cache-from type=registry,ref=name:version "
|
||||
"--cache-from type=registry,ref=name:latest "
|
||||
"--cache-to type=inline,mode=max --push --progress plain -",
|
||||
"--cache-to type=inline,mode=max --push --progress plain path",
|
||||
mock_popen.call_args.args,
|
||||
)
|
||||
self.assertTrue(result)
|
||||
@ -144,13 +143,12 @@ class TestDockerImageCheck(unittest.TestCase):
|
||||
mock_popen.assert_called_once()
|
||||
mock_machine.assert_not_called()
|
||||
self.assertIn(
|
||||
"tar -v --exclude-vcs-ignores --show-transformed-names --transform 's#path#./#' --dereference --create path | "
|
||||
f"docker buildx build --builder default --label build-url={GITHUB_RUN_URL} "
|
||||
"--build-arg FROM_TAG=version2 "
|
||||
f"--build-arg CACHE_INVALIDATOR={GITHUB_RUN_URL} "
|
||||
"--tag name:version2 --cache-from type=registry,ref=name:version2 "
|
||||
"--cache-from type=registry,ref=name:latest "
|
||||
"--cache-to type=inline,mode=max --progress plain -",
|
||||
"--cache-to type=inline,mode=max --progress plain path",
|
||||
mock_popen.call_args.args,
|
||||
)
|
||||
self.assertTrue(result)
|
||||
@ -162,12 +160,11 @@ class TestDockerImageCheck(unittest.TestCase):
|
||||
mock_popen.assert_called_once()
|
||||
mock_machine.assert_not_called()
|
||||
self.assertIn(
|
||||
"tar -v --exclude-vcs-ignores --show-transformed-names --transform 's#path#./#' --dereference --create path | "
|
||||
f"docker buildx build --builder default --label build-url={GITHUB_RUN_URL} "
|
||||
f"--build-arg CACHE_INVALIDATOR={GITHUB_RUN_URL} "
|
||||
"--tag name:version2 --cache-from type=registry,ref=name:version2 "
|
||||
"--cache-from type=registry,ref=name:latest "
|
||||
"--cache-to type=inline,mode=max --progress plain -",
|
||||
"--cache-to type=inline,mode=max --progress plain path",
|
||||
mock_popen.call_args.args,
|
||||
)
|
||||
self.assertFalse(result)
|
||||
@ -181,14 +178,13 @@ class TestDockerImageCheck(unittest.TestCase):
|
||||
mock_popen.assert_called_once()
|
||||
mock_machine.assert_not_called()
|
||||
self.assertIn(
|
||||
"tar -v --exclude-vcs-ignores --show-transformed-names --transform 's#path#./#' --dereference --create path | "
|
||||
f"docker buildx build --builder default --label build-url={GITHUB_RUN_URL} "
|
||||
f"--build-arg CACHE_INVALIDATOR={GITHUB_RUN_URL} "
|
||||
"--tag name:version2 --cache-from type=registry,ref=name:version2 "
|
||||
"--cache-from type=registry,ref=name:latest "
|
||||
"--cache-from type=registry,ref=name:cached-version "
|
||||
"--cache-from type=registry,ref=name:another-cached "
|
||||
"--cache-to type=inline,mode=max --progress plain -",
|
||||
"--cache-to type=inline,mode=max --progress plain path",
|
||||
mock_popen.call_args.args,
|
||||
)
|
||||
self.assertFalse(result)
|
||||
|
@ -196,10 +196,7 @@ def test_install(image: DockerImage, tests: Dict[str, str]) -> TestResults:
|
||||
status = FAIL
|
||||
copy2(log_file, LOGS_PATH)
|
||||
archive_path = TEMP_PATH / f"{container_name}-{retry}.tar.gz"
|
||||
compress_fast(
|
||||
LOGS_PATH.as_posix(),
|
||||
archive_path.as_posix(),
|
||||
)
|
||||
compress_fast(LOGS_PATH, archive_path)
|
||||
logs.append(archive_path)
|
||||
|
||||
subprocess.check_call(f"docker kill -s 9 {container_id}", shell=True)
|
||||
|
@ -1,11 +1,12 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import time
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
import argparse
|
||||
from pathlib import Path
|
||||
|
||||
import boto3 # type: ignore
|
||||
import requests # type: ignore
|
||||
@ -268,8 +269,8 @@ if __name__ == "__main__":
|
||||
description = "Found invalid analysis (ノಥ益ಥ)ノ ┻━┻"
|
||||
|
||||
compress_fast(
|
||||
os.path.join(result_path, "store"),
|
||||
os.path.join(result_path, "jepsen_store.tar.zst"),
|
||||
Path(result_path) / "store",
|
||||
Path(result_path) / "jepsen_store.tar.zst",
|
||||
)
|
||||
additional_data.append(os.path.join(result_path, "jepsen_store.tar.zst"))
|
||||
except Exception as ex:
|
||||
|
@ -102,7 +102,11 @@ class S3Helper:
|
||||
file_path,
|
||||
file_path + ".zst",
|
||||
)
|
||||
compress_file_fast(file_path, file_path + ".zst")
|
||||
# FIXME: rewrite S3 to Path
|
||||
_file_path = Path(file_path)
|
||||
compress_file_fast(
|
||||
_file_path, _file_path.with_suffix(_file_path.suffix + ".zst")
|
||||
)
|
||||
file_path += ".zst"
|
||||
s3_path += ".zst"
|
||||
else:
|
||||
|
@ -281,7 +281,7 @@ def test_reload_after_fail_in_cache_dictionary(started_cluster):
|
||||
query_and_get_error = instance.query_and_get_error
|
||||
|
||||
# Can't get a value from the cache dictionary because the source (table `test.xypairs`) doesn't respond.
|
||||
expected_error = "Table test.xypairs doesn't exist"
|
||||
expected_error = "Table test.xypairs does not exist"
|
||||
update_error = "Could not update cache dictionary cache_xypairs now"
|
||||
assert expected_error in query_and_get_error(
|
||||
"SELECT dictGetUInt64('cache_xypairs', 'y', toUInt64(1))"
|
||||
|
@ -125,7 +125,7 @@ def test_query_is_lock_free(lock_free_query, exclusive_table):
|
||||
SELECT count() FROM {exclusive_table};
|
||||
"""
|
||||
)
|
||||
assert f"Table default.{exclusive_table} doesn't exist" in result
|
||||
assert f"Table default.{exclusive_table} does not exist" in result
|
||||
else:
|
||||
assert 0 == int(
|
||||
node.query(
|
||||
|
@ -159,11 +159,11 @@ def test_drop_replica(start_cluster):
|
||||
for i in range(1, 5):
|
||||
node_1_1.query("DETACH DATABASE test{}".format(i))
|
||||
|
||||
assert "doesn't exist" in node_1_3.query_and_get_error(
|
||||
assert "does not exist" in node_1_3.query_and_get_error(
|
||||
"SYSTEM DROP REPLICA 'node_1_1' FROM TABLE test.test_table"
|
||||
)
|
||||
|
||||
assert "doesn't exist" in node_1_3.query_and_get_error(
|
||||
assert "does not exist" in node_1_3.query_and_get_error(
|
||||
"SYSTEM DROP REPLICA 'node_1_1' FROM DATABASE test1"
|
||||
)
|
||||
|
||||
|
@ -246,7 +246,7 @@ def test_inserts_local(started_cluster):
|
||||
|
||||
def test_inserts_single_replica_local_internal_replication(started_cluster):
|
||||
with pytest.raises(
|
||||
QueryRuntimeException, match="Table default.single_replicated doesn't exist"
|
||||
QueryRuntimeException, match="Table default.single_replicated does not exist"
|
||||
):
|
||||
node1.query(
|
||||
"INSERT INTO distributed_one_replica_internal_replication VALUES ('2000-01-01', 1)",
|
||||
@ -279,7 +279,8 @@ def test_inserts_single_replica_internal_replication(started_cluster):
|
||||
def test_inserts_single_replica_no_internal_replication(started_cluster):
|
||||
try:
|
||||
with pytest.raises(
|
||||
QueryRuntimeException, match="Table default.single_replicated doesn't exist"
|
||||
QueryRuntimeException,
|
||||
match="Table default.single_replicated does not exist",
|
||||
):
|
||||
node1.query(
|
||||
"INSERT INTO distributed_one_replica_no_internal_replication VALUES ('2000-01-01', 1)",
|
||||
|
@ -207,7 +207,7 @@ def test_mysql_client(started_cluster):
|
||||
expected_msg = "\n".join(
|
||||
[
|
||||
"mysql: [Warning] Using a password on the command line interface can be insecure.",
|
||||
"ERROR 81 (00000) at line 1: Code: 81. DB::Exception: Database system2 doesn't exist",
|
||||
"ERROR 81 (00000) at line 1: Code: 81. DB::Exception: Database system2 does not exist",
|
||||
]
|
||||
)
|
||||
assert stderr[: len(expected_msg)].decode() == expected_msg
|
||||
@ -621,7 +621,7 @@ def test_python_client(started_cluster):
|
||||
client.select_db("system2")
|
||||
|
||||
assert exc_info.value.args[1].startswith(
|
||||
"Code: 81. DB::Exception: Database system2 doesn't exist"
|
||||
"Code: 81. DB::Exception: Database system2 does not exist"
|
||||
), exc_info.value.args[1]
|
||||
|
||||
cursor = client.cursor(pymysql.cursors.DictCursor)
|
||||
@ -646,7 +646,7 @@ def test_golang_client(started_cluster, golang_container):
|
||||
)
|
||||
|
||||
assert code == 1
|
||||
assert stderr.decode() == "Error 81: Database abc doesn't exist\n"
|
||||
assert stderr.decode() == "Error 81: Database abc does not exist\n"
|
||||
|
||||
code, (stdout, stderr) = golang_container.exec_run(
|
||||
"./main --host {host} --port {port} --user default --password 123 --database "
|
||||
|
@ -27,7 +27,10 @@ proto_dir = os.path.join(SCRIPT_DIR, "./protos")
|
||||
gen_dir = os.path.join(SCRIPT_DIR, "./_gen")
|
||||
os.makedirs(gen_dir, exist_ok=True)
|
||||
run_and_check(
|
||||
f"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} {proto_dir}/clickhouse_grpc.proto",
|
||||
"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} \
|
||||
{proto_dir}/clickhouse_grpc.proto".format(
|
||||
proto_dir=proto_dir, gen_dir=gen_dir
|
||||
),
|
||||
shell=True,
|
||||
)
|
||||
|
||||
|
@ -1 +0,0 @@
|
||||
_gen
|
@ -1,9 +0,0 @@
|
||||
<clickhouse>
|
||||
<logger>
|
||||
<level>trace</level>
|
||||
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
|
||||
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
|
||||
<size>1000M</size>
|
||||
<count>10</count>
|
||||
</logger>
|
||||
</clickhouse>
|
@ -1,9 +0,0 @@
|
||||
<clickhouse>
|
||||
<postgresql_port>5433</postgresql_port>
|
||||
<mysql_port>9001</mysql_port>
|
||||
<grpc_port>9100</grpc_port>
|
||||
<grpc replace="replace">
|
||||
<!-- Enable if you want very detailed logs -->
|
||||
<verbose_logs>false</verbose_logs>
|
||||
</grpc>
|
||||
</clickhouse>
|
@ -1,9 +0,0 @@
|
||||
<clickhouse>
|
||||
<session_log>
|
||||
<database>system</database>
|
||||
<table>session_log</table>
|
||||
|
||||
<partition_by>toYYYYMM(event_date)</partition_by>
|
||||
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
|
||||
</session_log>
|
||||
</clickhouse>
|
@ -1,23 +0,0 @@
|
||||
<clickhouse>
|
||||
<profiles>
|
||||
<default>
|
||||
<function_sleep_max_microseconds_per_block>0</function_sleep_max_microseconds_per_block>
|
||||
</default>
|
||||
</profiles>
|
||||
<users>
|
||||
<default>
|
||||
</default>
|
||||
<mysql_user>
|
||||
<password>pass</password>
|
||||
</mysql_user>
|
||||
<postgres_user>
|
||||
<password>pass</password>
|
||||
</postgres_user>
|
||||
<grpc_user>
|
||||
<password>pass</password>
|
||||
</grpc_user>
|
||||
<parallel_user>
|
||||
<password>pass</password>
|
||||
</parallel_user>
|
||||
</users>
|
||||
</clickhouse>
|
@ -1 +0,0 @@
|
||||
../../../../src/Server/grpc_protos/clickhouse_grpc.proto
|
@ -1,289 +0,0 @@
|
||||
import os
|
||||
|
||||
import grpc
|
||||
import pymysql.connections
|
||||
import psycopg2 as py_psql
|
||||
import pytest
|
||||
import random
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from helpers.cluster import ClickHouseCluster, run_and_check
|
||||
|
||||
POSTGRES_SERVER_PORT = 5433
|
||||
MYSQL_SERVER_PORT = 9001
|
||||
GRPC_PORT = 9100
|
||||
SESSION_LOG_MATCHING_FIELDS = "auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
|
||||
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
DEFAULT_ENCODING = "utf-8"
|
||||
|
||||
# Use grpcio-tools to generate *pb2.py files from *.proto.
|
||||
proto_dir = os.path.join(SCRIPT_DIR, "./protos")
|
||||
gen_dir = os.path.join(SCRIPT_DIR, "./_gen")
|
||||
os.makedirs(gen_dir, exist_ok=True)
|
||||
run_and_check(
|
||||
f"python3 -m grpc_tools.protoc -I{proto_dir} --python_out={gen_dir} --grpc_python_out={gen_dir} {proto_dir}/clickhouse_grpc.proto",
|
||||
shell=True,
|
||||
)
|
||||
|
||||
sys.path.append(gen_dir)
|
||||
|
||||
import clickhouse_grpc_pb2
|
||||
import clickhouse_grpc_pb2_grpc
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
instance = cluster.add_instance(
|
||||
"node",
|
||||
main_configs=[
|
||||
"configs/ports.xml",
|
||||
"configs/log.xml",
|
||||
"configs/session_log.xml",
|
||||
],
|
||||
user_configs=["configs/users.xml"],
|
||||
# Bug in TSAN reproduces in this test https://github.com/grpc/grpc/issues/29550#issuecomment-1188085387
|
||||
env_variables={
|
||||
"TSAN_OPTIONS": "report_atomic_races=0 " + os.getenv("TSAN_OPTIONS", default="")
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def grpc_get_url():
|
||||
return f"{instance.ip_address}:{GRPC_PORT}"
|
||||
|
||||
|
||||
def grpc_create_insecure_channel():
|
||||
channel = grpc.insecure_channel(grpc_get_url())
|
||||
grpc.channel_ready_future(channel).result(timeout=2)
|
||||
return channel
|
||||
|
||||
|
||||
session_id_counter = 0
|
||||
|
||||
|
||||
def next_session_id():
|
||||
global session_id_counter
|
||||
session_id = session_id_counter
|
||||
session_id_counter += 1
|
||||
return str(session_id)
|
||||
|
||||
|
||||
def grpc_query(query, user_, pass_, raise_exception):
|
||||
try:
|
||||
query_info = clickhouse_grpc_pb2.QueryInfo(
|
||||
query=query,
|
||||
session_id=next_session_id(),
|
||||
user_name=user_,
|
||||
password=pass_,
|
||||
)
|
||||
channel = grpc_create_insecure_channel()
|
||||
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel)
|
||||
result = stub.ExecuteQuery(query_info)
|
||||
if result and result.HasField("exception"):
|
||||
raise Exception(result.exception.display_text)
|
||||
|
||||
return result.output.decode(DEFAULT_ENCODING)
|
||||
except Exception:
|
||||
assert raise_exception
|
||||
|
||||
|
||||
def postgres_query(query, user_, pass_, raise_exception):
|
||||
try:
|
||||
client = py_psql.connect(
|
||||
host=instance.ip_address,
|
||||
port=POSTGRES_SERVER_PORT,
|
||||
user=user_,
|
||||
password=pass_,
|
||||
database="default",
|
||||
)
|
||||
cursor = client.cursor()
|
||||
cursor.execute(query)
|
||||
cursor.fetchall()
|
||||
except Exception:
|
||||
assert raise_exception
|
||||
|
||||
|
||||
def mysql_query(query, user_, pass_, raise_exception):
|
||||
try:
|
||||
client = pymysql.connections.Connection(
|
||||
host=instance.ip_address,
|
||||
user=user_,
|
||||
password=pass_,
|
||||
database="default",
|
||||
port=MYSQL_SERVER_PORT,
|
||||
)
|
||||
cursor = client.cursor(pymysql.cursors.DictCursor)
|
||||
if raise_exception:
|
||||
with pytest.raises(Exception):
|
||||
cursor.execute(query)
|
||||
else:
|
||||
cursor.execute(query)
|
||||
cursor.fetchall()
|
||||
except Exception:
|
||||
assert raise_exception
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def started_cluster():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_grpc_session(started_cluster):
|
||||
grpc_query("SELECT 1", "grpc_user", "pass", False)
|
||||
grpc_query("SELECT 2", "grpc_user", "wrong_pass", True)
|
||||
grpc_query("SELECT 3", "wrong_grpc_user", "pass", True)
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
login_success_records = instance.query(
|
||||
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'LoginSuccess'"
|
||||
)
|
||||
assert login_success_records == "grpc_user\t1\t1\n"
|
||||
logout_records = instance.query(
|
||||
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'Logout'"
|
||||
)
|
||||
assert logout_records == "grpc_user\t1\t1\n"
|
||||
login_failure_records = instance.query(
|
||||
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='grpc_user' AND type = 'LoginFailure'"
|
||||
)
|
||||
assert login_failure_records == "grpc_user\t1\t1\n"
|
||||
logins_and_logouts = instance.query(
|
||||
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'grpc_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'grpc_user' AND type = 'Logout')"
|
||||
)
|
||||
assert logins_and_logouts == "1\n"
|
||||
|
||||
|
||||
def test_mysql_session(started_cluster):
|
||||
mysql_query("SELECT 1", "mysql_user", "pass", False)
|
||||
mysql_query("SELECT 2", "mysql_user", "wrong_pass", True)
|
||||
mysql_query("SELECT 3", "wrong_mysql_user", "pass", True)
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
login_success_records = instance.query(
|
||||
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'LoginSuccess'"
|
||||
)
|
||||
assert login_success_records == "mysql_user\t1\t1\n"
|
||||
logout_records = instance.query(
|
||||
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'Logout'"
|
||||
)
|
||||
assert logout_records == "mysql_user\t1\t1\n"
|
||||
login_failure_records = instance.query(
|
||||
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='mysql_user' AND type = 'LoginFailure'"
|
||||
)
|
||||
assert login_failure_records == "mysql_user\t1\t1\n"
|
||||
logins_and_logouts = instance.query(
|
||||
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'mysql_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'mysql_user' AND type = 'Logout')"
|
||||
)
|
||||
assert logins_and_logouts == "1\n"
|
||||
|
||||
|
||||
def test_postgres_session(started_cluster):
|
||||
postgres_query("SELECT 1", "postgres_user", "pass", False)
|
||||
postgres_query("SELECT 2", "postgres_user", "wrong_pass", True)
|
||||
postgres_query("SELECT 3", "wrong_postgres_user", "pass", True)
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
login_success_records = instance.query(
|
||||
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'LoginSuccess'"
|
||||
)
|
||||
assert login_success_records == "postgres_user\t1\t1\n"
|
||||
logout_records = instance.query(
|
||||
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'Logout'"
|
||||
)
|
||||
assert logout_records == "postgres_user\t1\t1\n"
|
||||
login_failure_records = instance.query(
|
||||
"SELECT user, client_port <> 0, client_address <> toIPv6('::') FROM system.session_log WHERE user='postgres_user' AND type = 'LoginFailure'"
|
||||
)
|
||||
assert login_failure_records == "postgres_user\t1\t1\n"
|
||||
logins_and_logouts = instance.query(
|
||||
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'postgres_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'postgres_user' AND type = 'Logout')"
|
||||
)
|
||||
assert logins_and_logouts == "1\n"
|
||||
|
||||
|
||||
def test_parallel_sessions(started_cluster):
|
||||
thread_list = []
|
||||
for _ in range(10):
|
||||
# Sleep time does not significantly matter here,
|
||||
# test should pass even without sleeping.
|
||||
for function in [postgres_query, grpc_query, mysql_query]:
|
||||
thread = threading.Thread(
|
||||
target=function,
|
||||
args=(
|
||||
f"SELECT sleep({random.uniform(0.03, 0.04)})",
|
||||
"parallel_user",
|
||||
"pass",
|
||||
False,
|
||||
),
|
||||
)
|
||||
thread.start()
|
||||
thread_list.append(thread)
|
||||
thread = threading.Thread(
|
||||
target=function,
|
||||
args=(
|
||||
f"SELECT sleep({random.uniform(0.03, 0.04)})",
|
||||
"parallel_user",
|
||||
"wrong_pass",
|
||||
True,
|
||||
),
|
||||
)
|
||||
thread.start()
|
||||
thread_list.append(thread)
|
||||
thread = threading.Thread(
|
||||
target=function,
|
||||
args=(
|
||||
f"SELECT sleep({random.uniform(0.03, 0.04)})",
|
||||
"wrong_parallel_user",
|
||||
"pass",
|
||||
True,
|
||||
),
|
||||
)
|
||||
thread.start()
|
||||
thread_list.append(thread)
|
||||
|
||||
for thread in thread_list:
|
||||
thread.join()
|
||||
|
||||
instance.query("SYSTEM FLUSH LOGS")
|
||||
port_0_sessions = instance.query(
|
||||
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user'"
|
||||
)
|
||||
assert port_0_sessions == "90\n"
|
||||
|
||||
port_0_sessions = instance.query(
|
||||
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND client_port = 0"
|
||||
)
|
||||
assert port_0_sessions == "0\n"
|
||||
|
||||
address_0_sessions = instance.query(
|
||||
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND client_address = toIPv6('::')"
|
||||
)
|
||||
assert address_0_sessions == "0\n"
|
||||
|
||||
grpc_sessions = instance.query(
|
||||
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'gRPC'"
|
||||
)
|
||||
assert grpc_sessions == "30\n"
|
||||
|
||||
mysql_sessions = instance.query(
|
||||
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'MySQL'"
|
||||
)
|
||||
assert mysql_sessions == "30\n"
|
||||
|
||||
postgres_sessions = instance.query(
|
||||
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND interface = 'PostgreSQL'"
|
||||
)
|
||||
assert postgres_sessions == "30\n"
|
||||
|
||||
logins_and_logouts = instance.query(
|
||||
f"SELECT COUNT(*) FROM (SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'parallel_user' AND type = 'LoginSuccess' INTERSECT SELECT {SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = 'parallel_user' AND type = 'Logout')"
|
||||
)
|
||||
assert logins_and_logouts == "30\n"
|
||||
|
||||
logout_failure_sessions = instance.query(
|
||||
f"SELECT COUNT(*) FROM system.session_log WHERE user = 'parallel_user' AND type = 'LoginFailure'"
|
||||
)
|
||||
assert logout_failure_sessions == "30\n"
|
@ -47,7 +47,7 @@ def test_system_logs(flush_logs, table, exists):
|
||||
else:
|
||||
response = node.query_and_get_error(q)
|
||||
assert (
|
||||
"Table {} doesn't exist".format(table) in response
|
||||
"Table {} does not exist".format(table) in response
|
||||
or "Unknown table expression identifier '{}'".format(table) in response
|
||||
)
|
||||
|
||||
|
108
tests/integration/test_wrong_db_or_table_name/test.py
Normal file
108
tests/integration/test_wrong_db_or_table_name/test.py
Normal file
@ -0,0 +1,108 @@
|
||||
import pytest
|
||||
from helpers.client import QueryRuntimeException
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance("node")
|
||||
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def start():
|
||||
try:
|
||||
cluster.start()
|
||||
yield cluster
|
||||
finally:
|
||||
cluster.shutdown()
|
||||
|
||||
|
||||
def test_wrong_database_name(start):
|
||||
node.query(
|
||||
"""
|
||||
CREATE DATABASE test;
|
||||
CREATE TABLE test.table_test (i Int64) ENGINE=Memory;
|
||||
INSERT INTO test.table_test SELECT 1;
|
||||
"""
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
QueryRuntimeException,
|
||||
match="DB::Exception: Database tes does not exist. Maybe you meant test?.",
|
||||
):
|
||||
node.query("SELECT * FROM tes.table_test LIMIT 1;")
|
||||
assert int(node.query("SELECT count() FROM test.table_test;")) == 1
|
||||
node.query(
|
||||
"""
|
||||
DROP TABLE test.table_test;
|
||||
DROP DATABASE test;
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def test_drop_wrong_database_name(start):
|
||||
node.query(
|
||||
"""
|
||||
CREATE DATABASE test;
|
||||
CREATE TABLE test.table_test (i Int64) ENGINE=Memory;
|
||||
INSERT INTO test.table_test SELECT 1;
|
||||
"""
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
QueryRuntimeException,
|
||||
match="DB::Exception: Database tes does not exist. Maybe you meant test?.",
|
||||
):
|
||||
node.query("DROP DATABASE tes;")
|
||||
assert int(node.query("SELECT count() FROM test.table_test;")) == 1
|
||||
node.query("DROP DATABASE test;")
|
||||
|
||||
|
||||
def test_wrong_table_name(start):
|
||||
node.query(
|
||||
"""
|
||||
CREATE DATABASE test;
|
||||
CREATE TABLE test.table_test (i Int64) ENGINE=Memory;
|
||||
CREATE TABLE test.table_test2 (i Int64) ENGINE=Memory;
|
||||
INSERT INTO test.table_test SELECT 1;
|
||||
"""
|
||||
)
|
||||
with pytest.raises(
|
||||
QueryRuntimeException,
|
||||
match="DB::Exception: Table test.table_test1 does not exist. Maybe you meant table_test?.",
|
||||
):
|
||||
node.query(
|
||||
"""
|
||||
SELECT * FROM test.table_test1 LIMIT 1;
|
||||
"""
|
||||
)
|
||||
assert int(node.query("SELECT count() FROM test.table_test;")) == 1
|
||||
node.query(
|
||||
"""
|
||||
DROP TABLE test.table_test;
|
||||
DROP TABLE test.table_test2;
|
||||
DROP DATABASE test;
|
||||
"""
|
||||
)
|
||||
|
||||
|
||||
def test_drop_wrong_table_name(start):
|
||||
node.query(
|
||||
"""
|
||||
CREATE DATABASE test;
|
||||
CREATE TABLE test.table_test (i Int64) ENGINE=Memory;
|
||||
INSERT INTO test.table_test SELECT 1;
|
||||
"""
|
||||
)
|
||||
|
||||
with pytest.raises(
|
||||
QueryRuntimeException,
|
||||
match="DB::Exception: Table test.table_tes does not exist. Maybe you meant table_test?.",
|
||||
):
|
||||
node.query("DROP TABLE test.table_tes;")
|
||||
assert int(node.query("SELECT count() FROM test.table_test;")) == 1
|
||||
node.query(
|
||||
"""
|
||||
DROP TABLE test.table_test;
|
||||
DROP DATABASE test;
|
||||
"""
|
||||
)
|
@ -32,8 +32,8 @@ create temporary table known_short_messages (s String) as select * from (select
|
||||
'brotli decode error{}', 'Invalid H3 index: {}', 'Too large node state size', 'No additional keys found.',
|
||||
'Attempt to read after EOF.', 'Replication was stopped', '{} building file infos', 'Cannot parse uuid {}',
|
||||
'Query was cancelled', 'Cancelled merging parts', 'Cancelled mutating parts', 'Log pulling is cancelled',
|
||||
'Transaction was cancelled', 'Could not find table: {}', 'Table {} doesn''t exist',
|
||||
'Database {} doesn''t exist', 'Dictionary ({}) not found', 'Unknown table function {}',
|
||||
'Transaction was cancelled', 'Could not find table: {}', 'Table {} does not exist',
|
||||
'Database {} does not exist', 'Dictionary ({}) not found', 'Unknown table function {}',
|
||||
'Unknown format {}', 'Unknown explain kind ''{}''', 'Unknown setting {}', 'Unknown input format {}',
|
||||
'Unknown identifier: ''{}''', 'User name is empty', 'Expected function, got: {}',
|
||||
'Attempt to read after eof', 'String size is too big ({}), maximum: {}', 'API mode: {}'
|
||||
@ -48,7 +48,7 @@ select 'messages shorter than 16', max2(countDistinctOrDefault(message_format_st
|
||||
-- Unlike above, here we look at length of the formatted message, not format string. Most short format strings are fine because they end up decorated with context from outer or inner exceptions, e.g.:
|
||||
-- "Expected end of line" -> "Code: 117. DB::Exception: Expected end of line: (in file/uri /var/lib/clickhouse/user_files/data_02118): (at row 1)"
|
||||
-- But we have to cut out the boilerplate, e.g.:
|
||||
-- "Code: 60. DB::Exception: Table default.a doesn't exist. (UNKNOWN_TABLE), Stack trace" -> "Table default.a doesn't exist."
|
||||
-- "Code: 60. DB::Exception: Table default.a does not exist. (UNKNOWN_TABLE), Stack trace" -> "Table default.a does not exist."
|
||||
-- This table currently doesn't have enough information to do this reliably, so we just regex search for " (ERROR_NAME_IN_CAPS)" and hope that's good enough.
|
||||
-- For the "Code: 123. DB::Exception: " part, we just subtract 26 instead of searching for it. Because sometimes it's not at the start, e.g.:
|
||||
-- "Unexpected error, will try to restart main thread: Code: 341. DB::Exception: Unexpected error: Code: 57. DB::Exception:[...]"
|
||||
|
@ -24,7 +24,7 @@ $CLICKHOUSE_CLIENT --function_sleep_max_microseconds_per_block 15000000 -q "INSE
|
||||
sleep 1
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "RENAME DATABASE test_01192 TO default" 2>&1| grep -F "already exists" > /dev/null && echo "ok"
|
||||
$CLICKHOUSE_CLIENT -q "RENAME DATABASE test_01192_notexisting TO test_01192_renamed" 2>&1| grep -F "doesn't exist" > /dev/null && echo "ok"
|
||||
$CLICKHOUSE_CLIENT -q "RENAME DATABASE test_01192_notexisting TO test_01192_renamed" 2>&1| grep -F "does not exist" > /dev/null && echo "ok"
|
||||
$CLICKHOUSE_CLIENT -q "RENAME DATABASE test_01192 TO test_01192_renamed" && echo "renamed"
|
||||
wait
|
||||
|
||||
@ -50,7 +50,7 @@ $CLICKHOUSE_CLIENT -q "RENAME TABLE test_01192.mt TO test_01192_atomic.mt, test_
|
||||
# 6. check data after RENAME
|
||||
$CLICKHOUSE_CLIENT -q "SELECT count(n), sum(n) FROM test_01192_atomic.mt"
|
||||
$CLICKHOUSE_CLIENT -q "SELECT count(n), sum(n) FROM test_01192_atomic.rmt"
|
||||
$CLICKHOUSE_CLIENT -q "SELECT count(n), sum(n) FROM test_01192_atomic.mv" 2>&1| grep -F "doesn't exist" > /dev/null && echo "ok"
|
||||
$CLICKHOUSE_CLIENT -q "SELECT count(n), sum(n) FROM test_01192_atomic.mv" 2>&1| grep -F "does not exist" > /dev/null && echo "ok"
|
||||
|
||||
# 7. create dictionary and check it
|
||||
$CLICKHOUSE_CLIENT -q "CREATE TABLE test_01192.mt (n UInt64, _part String) ENGINE=Memory" # mock
|
||||
|
@ -1,3 +1,5 @@
|
||||
-- Tags: no-random-settings, no-asan, no-msan, no-tsan, no-ubsan, no-debug
|
||||
|
||||
select count() from
|
||||
(
|
||||
select toInt128(number) * number x, toInt256(number) * number y from numbers_mt(100000000) where x != y
|
||||
|
@ -108,7 +108,7 @@ clickhouse_local "INSERT INTO db_ordinary.src SELECT * FROM numbers(10)"
|
||||
clickhouse_local "SELECT if(count() = 10, 'MV is working', 'MV failed') FROM db_ordinary.src_mv_with_inner"
|
||||
|
||||
clickhouse_local "DETACH VIEW db_ordinary.src_mv_with_inner PERMANENTLY; INSERT INTO db_ordinary.src SELECT * FROM numbers(10)" --stacktrace
|
||||
clickhouse_local "SELECT if(count() = 10, 'MV can be detached permanently', 'MV detach failed') FROM db_ordinary.src_mv_with_inner" 2>&1 | grep -c "db_ordinary.src_mv_with_inner doesn't exist"
|
||||
clickhouse_local "SELECT if(count() = 10, 'MV can be detached permanently', 'MV detach failed') FROM db_ordinary.src_mv_with_inner" 2>&1 | grep -c "db_ordinary.src_mv_with_inner does not exist"
|
||||
## Quite silly: ATTACH MATERIALIZED VIEW don't work with short syntax (w/o select), but i can attach it using ATTACH TABLE ...
|
||||
clickhouse_local "ATTACH TABLE db_ordinary.src_mv_with_inner"
|
||||
clickhouse_local "INSERT INTO db_ordinary.src SELECT * FROM numbers(10)"
|
||||
|
@ -6,4 +6,4 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CUR_DIR"/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_LOCAL} --query "select col1, initializeAggregation('argMaxState', col2, insertTime) as col2, now() as insertTime FROM generateRandom('col1 String, col2 Array(Float64)') LIMIT 1000000 FORMAT CSV" | ${CLICKHOUSE_CURL} -s 'http://localhost:8123/?query=INSERT%20INTO%20non_existing_table%20SELECT%20col1%2C%20initializeAggregation(%27argMaxState%27%2C%20col2%2C%20insertTime)%20as%20col2%2C%20now()%20as%20insertTime%20FROM%20input(%27col1%20String%2C%20col2%20Array(Float64)%27)%20FORMAT%20CSV' --data-binary @- | grep -q "Table default.non_existing_table doesn't exist" && echo 'Ok.' || echo 'FAIL' ||:
|
||||
${CLICKHOUSE_LOCAL} --query "select col1, initializeAggregation('argMaxState', col2, insertTime) as col2, now() as insertTime FROM generateRandom('col1 String, col2 Array(Float64)') LIMIT 1000000 FORMAT CSV" | ${CLICKHOUSE_CURL} -s 'http://localhost:8123/?query=INSERT%20INTO%20non_existing_table%20SELECT%20col1%2C%20initializeAggregation(%27argMaxState%27%2C%20col2%2C%20insertTime)%20as%20col2%2C%20now()%20as%20insertTime%20FROM%20input(%27col1%20String%2C%20col2%20Array(Float64)%27)%20FORMAT%20CSV' --data-binary @- | grep -q "Table default.non_existing_table does not exist" && echo 'Ok.' || echo 'FAIL' ||:
|
||||
|
@ -0,0 +1,6 @@
|
||||
2
|
||||
0
|
||||
3355402240
|
||||
3355402240
|
||||
3321851904
|
||||
3321851904
|
@ -0,0 +1,26 @@
|
||||
set mutations_sync = 2;
|
||||
|
||||
drop table if exists t_delete_skip_index;
|
||||
|
||||
create table t_delete_skip_index (x UInt32, y String, index i y type minmax granularity 3) engine = MergeTree order by tuple();
|
||||
insert into t_delete_skip_index select number, toString(number) from numbers(8192 * 10);
|
||||
|
||||
select count() from t_delete_skip_index where y in (4, 5);
|
||||
alter table t_delete_skip_index delete where x < 8192;
|
||||
select count() from t_delete_skip_index where y in (4, 5);
|
||||
|
||||
drop table if exists t_delete_skip_index;
|
||||
drop table if exists t_delete_projection;
|
||||
|
||||
create table t_delete_projection (x UInt32, y UInt64, projection p (select sum(y))) engine = MergeTree order by tuple();
|
||||
insert into t_delete_projection select number, toString(number) from numbers(8192 * 10);
|
||||
|
||||
select sum(y) from t_delete_projection settings optimize_use_projections = 0;
|
||||
select sum(y) from t_delete_projection settings optimize_use_projections = 0, force_optimize_projection = 1;
|
||||
|
||||
alter table t_delete_projection delete where x < 8192;
|
||||
|
||||
select sum(y) from t_delete_projection settings optimize_use_projections = 0;
|
||||
select sum(y) from t_delete_projection settings optimize_use_projections = 0, force_optimize_projection = 1;
|
||||
|
||||
drop table if exists t_delete_projection;
|
@ -1,34 +0,0 @@
|
||||
sessions:
|
||||
150
|
||||
port_0_sessions:
|
||||
0
|
||||
address_0_sessions:
|
||||
0
|
||||
tcp_sessions
|
||||
60
|
||||
http_sessions
|
||||
30
|
||||
http_with_session_id_sessions
|
||||
30
|
||||
my_sql_sessions
|
||||
30
|
||||
Corresponding LoginSuccess/Logout
|
||||
10
|
||||
LoginFailure
|
||||
10
|
||||
Corresponding LoginSuccess/Logout
|
||||
10
|
||||
LoginFailure
|
||||
10
|
||||
Corresponding LoginSuccess/Logout
|
||||
10
|
||||
LoginFailure
|
||||
10
|
||||
Corresponding LoginSuccess/Logout
|
||||
10
|
||||
LoginFailure
|
||||
10
|
||||
Corresponding LoginSuccess/Logout
|
||||
10
|
||||
LoginFailure
|
||||
10
|
@ -1,138 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-fasttest, no-debug
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
readonly PID=$$
|
||||
|
||||
# Each user uses a separate thread.
|
||||
readonly TCP_USERS=( "02833_TCP_USER_${PID}"_{1,2} ) # 2 concurrent TCP users
|
||||
readonly HTTP_USERS=( "02833_HTTP_USER_${PID}" )
|
||||
readonly HTTP_WITH_SESSION_ID_SESSION_USERS=( "02833_HTTP_WITH_SESSION_ID_USER_${PID}" )
|
||||
readonly MYSQL_USERS=( "02833_MYSQL_USER_${PID}")
|
||||
readonly ALL_USERS=( "${TCP_USERS[@]}" "${HTTP_USERS[@]}" "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}" "${MYSQL_USERS[@]}" )
|
||||
|
||||
readonly TCP_USERS_SQL_COLLECTION_STRING="$( echo "${TCP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
|
||||
readonly HTTP_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
|
||||
readonly HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING="$( echo "${HTTP_WITH_SESSION_ID_SESSION_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
|
||||
readonly MYSQL_USERS_SQL_COLLECTION_STRING="$( echo "${MYSQL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
|
||||
readonly ALL_USERS_SQL_COLLECTION_STRING="$( echo "${ALL_USERS[*]}" | sed "s/[^[:space:]]\+/'&'/g" | sed 's/[[:space:]]/,/g' )"
|
||||
|
||||
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
|
||||
|
||||
for user in "${ALL_USERS[@]}"; do
|
||||
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${user} IDENTIFIED WITH plaintext_password BY 'pass'"
|
||||
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${user}"
|
||||
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${user}";
|
||||
done
|
||||
|
||||
# All <type>_session functions execute in separate threads.
|
||||
# These functions try to create a session with successful login and logout.
|
||||
# Sleep a small, random amount of time to make concurrency more intense.
|
||||
# and try to login with an invalid password.
|
||||
function tcp_session()
|
||||
{
|
||||
local user=$1
|
||||
local i=0
|
||||
while (( (i++) < 10 )); do
|
||||
# login logout
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM})" --user="${user}" --password="pass"
|
||||
# login failure
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT 2" --user="${user}" --password 'invalid'
|
||||
done
|
||||
}
|
||||
|
||||
function http_session()
|
||||
{
|
||||
local user=$1
|
||||
local i=0
|
||||
while (( (i++) < 10 )); do
|
||||
# login logout
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT 3, sleep(0.01${RANDOM})"
|
||||
|
||||
# login failure
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=wrong" -d "SELECT 4"
|
||||
done
|
||||
}
|
||||
|
||||
function http_with_session_id_session()
|
||||
{
|
||||
local user=$1
|
||||
local i=0
|
||||
while (( (i++) < 10 )); do
|
||||
# login logout
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=pass" -d "SELECT 5, sleep 0.01${RANDOM}"
|
||||
|
||||
# login failure
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&session_id=${user}&user=${user}&password=wrong" -d "SELECT 6"
|
||||
done
|
||||
}
|
||||
|
||||
function mysql_session()
|
||||
{
|
||||
local user=$1
|
||||
local i=0
|
||||
while (( (i++) < 10 )); do
|
||||
# login logout
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT 1, sleep(0.01${RANDOM}) FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'pass')"
|
||||
|
||||
# login failure
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT 1 FROM mysql('127.0.0.1:9004', 'system', 'one', '${user}', 'wrong', SETTINGS connection_max_tries=1)"
|
||||
done
|
||||
}
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
|
||||
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})"
|
||||
|
||||
export -f tcp_session;
|
||||
export -f http_session;
|
||||
export -f http_with_session_id_session;
|
||||
export -f mysql_session;
|
||||
|
||||
for user in "${TCP_USERS[@]}"; do
|
||||
timeout 60s bash -c "tcp_session ${user}" >/dev/null 2>&1 &
|
||||
done
|
||||
|
||||
for user in "${HTTP_USERS[@]}"; do
|
||||
timeout 60s bash -c "http_session ${user}" >/dev/null 2>&1 &
|
||||
done
|
||||
|
||||
for user in "${HTTP_WITH_SESSION_ID_SESSION_USERS[@]}"; do
|
||||
timeout 60s bash -c "http_with_session_id_session ${user}" >/dev/null 2>&1 &
|
||||
done
|
||||
|
||||
for user in "${MYSQL_USERS[@]}"; do
|
||||
timeout 60s bash -c "mysql_session ${user}" >/dev/null 2>&1 &
|
||||
done
|
||||
|
||||
wait
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
|
||||
|
||||
echo "sessions:"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING})"
|
||||
|
||||
echo "port_0_sessions:"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_port = 0"
|
||||
|
||||
echo "address_0_sessions:"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${ALL_USERS_SQL_COLLECTION_STRING}) AND client_address = toIPv6('::')"
|
||||
|
||||
echo "tcp_sessions"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${TCP_USERS_SQL_COLLECTION_STRING}) AND interface = 'TCP'"
|
||||
echo "http_sessions"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
|
||||
echo "http_with_session_id_sessions"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${HTTP_WITH_SESSION_ID_USERS_SQL_COLLECTION_STRING}) AND interface = 'HTTP'"
|
||||
echo "my_sql_sessions"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user IN (${MYSQL_USERS_SQL_COLLECTION_STRING}) AND interface = 'MySQL'"
|
||||
|
||||
for user in "${ALL_USERS[@]}"; do
|
||||
${CLICKHOUSE_CLIENT} -q "DROP USER ${user}"
|
||||
echo "Corresponding LoginSuccess/Logout"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${user}' AND type = 'Logout')"
|
||||
echo "LoginFailure"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${user}' AND type = 'LoginFailure'"
|
||||
done
|
@ -1,13 +0,0 @@
|
||||
0
|
||||
0
|
||||
0
|
||||
0
|
||||
client_port 0 connections:
|
||||
0
|
||||
client_address '::' connections:
|
||||
0
|
||||
login failures:
|
||||
0
|
||||
TCP Login and logout count is equal
|
||||
HTTP Login and logout count is equal
|
||||
MySQL Login and logout count is equal
|
@ -1,56 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-fasttest
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
readonly PID=$$
|
||||
readonly TEST_USER=$"02834_USER_${PID}"
|
||||
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER} IDENTIFIED WITH plaintext_password BY 'pass'"
|
||||
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON INFORMATION_SCHEMA.* TO ${TEST_USER}"
|
||||
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.* TO ${TEST_USER}"
|
||||
${CLICKHOUSE_CLIENT} -q "GRANT CREATE TEMPORARY TABLE, MYSQL, REMOTE ON *.* TO ${TEST_USER}"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
|
||||
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'"
|
||||
|
||||
${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \
|
||||
-d "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')"
|
||||
|
||||
${CLICKHOUSE_CURL} -sS -X POST "${CLICKHOUSE_URL}&user=${TEST_USER}&password=pass" \
|
||||
-d "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT * FROM remote('127.0.0.1:${CLICKHOUSE_PORT_TCP}', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT * FROM mysql('127.0.0.1:9004', 'system', 'one', '${TEST_USER}', 'pass')" -u "${TEST_USER}" --password "pass"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
|
||||
|
||||
echo "client_port 0 connections:"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_port = 0"
|
||||
|
||||
echo "client_address '::' connections:"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and client_address = toIPv6('::')"
|
||||
|
||||
echo "login failures:"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' and type = 'LoginFailure'"
|
||||
|
||||
# remote(...) function sometimes reuses old cached sessions for query execution.
|
||||
# This makes LoginSuccess/Logout entries count unstable, but success and logouts must always match.
|
||||
|
||||
for interface in 'TCP' 'HTTP' 'MySQL'
|
||||
do
|
||||
LOGIN_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}'"`
|
||||
CORRESPONDING_LOGOUT_RECORDS_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' AND interface = '${interface}' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}')"`
|
||||
|
||||
if [ "$LOGIN_COUNT" == "$CORRESPONDING_LOGOUT_RECORDS_COUNT" ]; then
|
||||
echo "${interface} Login and logout count is equal"
|
||||
else
|
||||
TOTAL_LOGOUT_COUNT=`${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout' AND interface = '${interface}'"`
|
||||
echo "${interface} Login count ${LOGIN_COUNT} != corresponding logout count ${CORRESPONDING_LOGOUT_RECORDS_COUNT}. TOTAL_LOGOUT_COUNT ${TOTAL_LOGOUT_COUNT}"
|
||||
fi
|
||||
done
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
|
@ -1,8 +0,0 @@
|
||||
port_0_sessions:
|
||||
0
|
||||
address_0_sessions:
|
||||
0
|
||||
Corresponding LoginSuccess/Logout
|
||||
9
|
||||
LoginFailure
|
||||
0
|
@ -1,114 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-debug
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
readonly PID=$$
|
||||
|
||||
readonly TEST_USER="02835_USER_${PID}"
|
||||
readonly TEST_ROLE="02835_ROLE_${PID}"
|
||||
readonly TEST_PROFILE="02835_PROFILE_${PID}"
|
||||
readonly SESSION_LOG_MATCHING_FIELDS="auth_id, auth_type, client_version_major, client_version_minor, client_version_patch, interface"
|
||||
|
||||
function tcp_session()
|
||||
{
|
||||
local user=$1
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.numbers" --user="${user}"
|
||||
}
|
||||
|
||||
function http_session()
|
||||
{
|
||||
local user=$1
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers"
|
||||
}
|
||||
|
||||
function http_with_session_id_session()
|
||||
{
|
||||
local user=$1
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}&user=${user}&password=pass" -d "SELECT COUNT(*) FROM system.numbers"
|
||||
}
|
||||
|
||||
# Busy-waits until user $1, specified amount of queries ($2) will run simultaneously.
|
||||
function wait_for_queries_start()
|
||||
{
|
||||
local user=$1
|
||||
local queries_count=$2
|
||||
# 10 seconds waiting
|
||||
counter=0 retries=100
|
||||
while [[ $counter -lt $retries ]]; do
|
||||
result=$($CLICKHOUSE_CLIENT --query "SELECT COUNT(*) FROM system.processes WHERE user = '${user}'")
|
||||
if [[ $result == "${queries_count}" ]]; then
|
||||
break;
|
||||
fi
|
||||
sleep 0.1
|
||||
((++counter))
|
||||
done
|
||||
}
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
|
||||
${CLICKHOUSE_CLIENT} -q "DELETE FROM system.session_log WHERE user = '${TEST_USER}'"
|
||||
|
||||
# DROP USE CASE
|
||||
${CLICKHOUSE_CLIENT} -q "CREATE USER IF NOT EXISTS ${TEST_USER}"
|
||||
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
|
||||
|
||||
export -f tcp_session;
|
||||
export -f http_session;
|
||||
export -f http_with_session_id_session;
|
||||
|
||||
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
|
||||
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
|
||||
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
|
||||
|
||||
wait_for_queries_start $TEST_USER 3
|
||||
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
|
||||
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
|
||||
|
||||
wait
|
||||
|
||||
# DROP ROLE CASE
|
||||
${CLICKHOUSE_CLIENT} -q "CREATE ROLE IF NOT EXISTS ${TEST_ROLE}"
|
||||
${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} DEFAULT ROLE ${TEST_ROLE}"
|
||||
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
|
||||
|
||||
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
|
||||
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
|
||||
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
|
||||
|
||||
wait_for_queries_start $TEST_USER 3
|
||||
${CLICKHOUSE_CLIENT} -q "DROP ROLE ${TEST_ROLE}"
|
||||
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
|
||||
|
||||
wait
|
||||
|
||||
# DROP PROFILE CASE
|
||||
${CLICKHOUSE_CLIENT} -q "CREATE SETTINGS PROFILE IF NOT EXISTS '${TEST_PROFILE}'"
|
||||
${CLICKHOUSE_CLIENT} -q "CREATE USER ${TEST_USER} SETTINGS PROFILE '${TEST_PROFILE}'"
|
||||
${CLICKHOUSE_CLIENT} -q "GRANT SELECT ON system.numbers TO ${TEST_USER}"
|
||||
|
||||
timeout 10s bash -c "tcp_session ${TEST_USER}" >/dev/null 2>&1 &
|
||||
timeout 10s bash -c "http_session ${TEST_USER}" >/dev/null 2>&1 &
|
||||
timeout 10s bash -c "http_with_session_id_session ${TEST_USER}" >/dev/null 2>&1 &
|
||||
|
||||
wait_for_queries_start $TEST_USER 3
|
||||
${CLICKHOUSE_CLIENT} -q "DROP SETTINGS PROFILE '${TEST_PROFILE}'"
|
||||
${CLICKHOUSE_CLIENT} -q "DROP USER ${TEST_USER}"
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "KILL QUERY WHERE user = '${TEST_USER}' SYNC" >/dev/null &
|
||||
|
||||
wait
|
||||
|
||||
${CLICKHOUSE_CLIENT} -q "SYSTEM FLUSH LOGS"
|
||||
|
||||
echo "port_0_sessions:"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_port = 0"
|
||||
echo "address_0_sessions:"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT count(*) FROM system.session_log WHERE user = '${TEST_USER}' AND client_address = toIPv6('::')"
|
||||
echo "Corresponding LoginSuccess/Logout"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM (SELECT ${SESSION_LOG_MATCHING_FIELDS} FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginSuccess' INTERSECT SELECT ${SESSION_LOG_MATCHING_FIELDS}, FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'Logout')"
|
||||
echo "LoginFailure"
|
||||
${CLICKHOUSE_CLIENT} -q "SELECT COUNT(*) FROM system.session_log WHERE user = '${TEST_USER}' AND type = 'LoginFailure'"
|
@ -0,0 +1,4 @@
|
||||
There is no handle /sashboards. Maybe you meant /dashboard
|
||||
There is no handle /sashboard. Maybe you meant /dashboard
|
||||
There is no handle /sashboarb. Maybe you meant /dashboard
|
||||
There is no handle /sashboaxb. Maybe you meant /dashboard
|
12
tests/queries/0_stateless/02842_suggest_http_page_in_error_message.sh
Executable file
12
tests/queries/0_stateless/02842_suggest_http_page_in_error_message.sh
Executable file
@ -0,0 +1,12 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
export CLICKHOUSE_URL="${CLICKHOUSE_PORT_HTTP_PROTO}://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_HTTP}/"
|
||||
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}sashboards" | grep -o ".* Maybe you meant /dashboard"
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}sashboard" | grep -o ".* Maybe you meant /dashboard"
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}sashboarb" | grep -o ".* Maybe you meant /dashboard"
|
||||
${CLICKHOUSE_CURL} -sS "${CLICKHOUSE_URL}sashboaxb" | grep -o ".* Maybe you meant /dashboard"
|
Loading…
Reference in New Issue
Block a user