Merge branch 'master' into add-ext-dict-redis

# Conflicts:
#	dbms/tests/integration/image/Dockerfile
This commit is contained in:
comunodi 2019-04-06 18:48:45 +03:00
commit dd121eff06
155 changed files with 2703 additions and 785 deletions

View File

@ -1,3 +1,8 @@
## ClickHouse release 19.4.2.7, 2019-03-30
### Bug Fixes
* Fixed reading from `Array(LowCardinality)` column in rare case when column contained a long sequence of empty arrays. [#4850](https://github.com/yandex/ClickHouse/pull/4850) ([Nikolai Kochetov](https://github.com/KochetovNicolai))
## ClickHouse release 19.4.1.3, 2019-03-19
### Bug Fixes

View File

@ -178,7 +178,7 @@ include (cmake/use_libcxx.cmake)
# This is intended for more control of what we are linking.
set (DEFAULT_LIBS "")
if (OS_LINUX AND NOT UNBUNDLED)
if (OS_LINUX AND NOT UNBUNDLED AND (GLIBC_COMPATIBILITY OR USE_LIBCXX))
# Note: this probably has no effect, but I'm not an expert in CMake.
set (CMAKE_C_IMPLICIT_LINK_LIBRARIES "")
set (CMAKE_CXX_IMPLICIT_LINK_LIBRARIES "")

View File

@ -10,7 +10,3 @@ ClickHouse is an open-source column-oriented database management system that all
* [Blog](https://clickhouse.yandex/blog/en/) contains various ClickHouse-related articles, as well as announces and reports about events.
* [Contacts](https://clickhouse.yandex/#contacts) can help to get your questions answered if there are any.
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
## Upcoming Events
* [ClickHouse Community Meetup](https://www.eventbrite.com/e/clickhouse-meetup-in-madrid-registration-55376746339) in Madrid on April 2.

View File

@ -20,7 +20,7 @@ set (CONFIG_VERSION ${CMAKE_CURRENT_BINARY_DIR}/src/Common/config_version.h)
set (CONFIG_COMMON ${CMAKE_CURRENT_BINARY_DIR}/src/Common/config.h)
include (cmake/version.cmake)
message (STATUS "Will build ${VERSION_FULL} revision ${VERSION_REVISION}")
message (STATUS "Will build ${VERSION_FULL} revision ${VERSION_REVISION} ${VERSION_OFFICIAL}")
configure_file (src/Common/config.h.in ${CONFIG_COMMON})
configure_file (src/Common/config_version.h.in ${CONFIG_VERSION})

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54417)
set(VERSION_REVISION 54418)
set(VERSION_MAJOR 19)
set(VERSION_MINOR 5)
set(VERSION_MINOR 6)
set(VERSION_PATCH 1)
set(VERSION_GITHASH 628ed349c335b79a441a1bd6e4bc791d61dfe62c)
set(VERSION_DESCRIBE v19.5.1.1-testing)
set(VERSION_STRING 19.5.1.1)
set(VERSION_GITHASH 30d3496c36cf3945c9828ac0b7cf7d1774a9f845)
set(VERSION_DESCRIBE v19.6.1.1-testing)
set(VERSION_STRING 19.6.1.1)
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")
@ -24,3 +24,7 @@ set (VERSION_FULL "${VERSION_NAME} ${VERSION_STRING}")
set (VERSION_SO "${VERSION_STRING}")
math (EXPR VERSION_INTEGER "${VERSION_PATCH} + ${VERSION_MINOR}*1000 + ${VERSION_MAJOR}*1000000")
if(YANDEX_OFFICIAL_BUILD)
set(VERSION_OFFICIAL " (official build)")
endif()

View File

@ -797,14 +797,33 @@ private:
written_progress_chars = 0;
written_first_block = false;
{
/// Temporarily apply query settings to context.
std::optional<Settings> old_settings;
SCOPE_EXIT({ if (old_settings) context.setSettings(*old_settings); });
auto apply_query_settings = [&](const IAST & settings_ast)
{
if (!old_settings)
old_settings.emplace(context.getSettingsRef());
for (const auto & change : settings_ast.as<ASTSetQuery>()->changes)
context.setSetting(change.name, change.value);
};
const auto * insert = parsed_query->as<ASTInsertQuery>();
if (insert && insert->settings_ast)
apply_query_settings(*insert->settings_ast);
/// FIXME: try to prettify this cast using `as<>()`
const auto * with_output = dynamic_cast<const ASTQueryWithOutput *>(parsed_query.get());
if (with_output && with_output->settings_ast)
apply_query_settings(*with_output->settings_ast);
connection->forceConnected();
/// INSERT query for which data transfer is needed (not an INSERT SELECT) is processed separately.
const auto * insert_query = parsed_query->as<ASTInsertQuery>();
if (insert_query && !insert_query->select)
if (insert && !insert->select)
processInsertQuery();
else
processOrdinaryQuery();
}
/// Do not change context (current DB, settings) in case of an exception.
if (!got_exception)
@ -964,8 +983,6 @@ private:
{
if (!insert->format.empty())
current_format = insert->format;
if (insert->settings_ast)
InterpreterSetQuery(insert->settings_ast, context).executeForCurrentContext();
}
BlockInputStreamPtr block_input = context.getInputFormat(
@ -1248,10 +1265,6 @@ private:
const auto & id = query_with_output->format->as<ASTIdentifier &>();
current_format = id.name;
}
if (query_with_output->settings_ast)
{
InterpreterSetQuery(query_with_output->settings_ast, context).executeForCurrentContext();
}
}
if (has_vertical_output_suffix)
@ -1510,7 +1523,7 @@ private:
void showClientVersion()
{
std::cout << DBMS_NAME << " client version " << VERSION_STRING << "." << std::endl;
std::cout << DBMS_NAME << " client version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
}
public:

View File

@ -1,6 +1,7 @@
#include <iostream>
#include <optional>
#include <boost/program_options.hpp>
#include <boost/algorithm/string/join.hpp>
#include <Common/Exception.h>
#include <IO/WriteBufferFromFileDescriptor.h>
@ -9,6 +10,8 @@
#include <Compression/CompressedReadBuffer.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Compression/CompressionFactory.h>
@ -64,7 +67,7 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
("hc", "use LZ4HC instead of LZ4")
("zstd", "use ZSTD instead of LZ4")
("codec", boost::program_options::value<std::vector<std::string>>()->multitoken(), "use codecs combination instead of LZ4")
("level", boost::program_options::value<std::vector<int>>()->multitoken(), "compression levels for codecs specified via --codec")
("level", boost::program_options::value<int>(), "compression level for codecs spicified via flags")
("none", "use no compression instead of LZ4")
("stat", "print block statistics of compressed data")
;
@ -94,6 +97,9 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
if ((use_lz4hc || use_zstd || use_none) && !codecs.empty())
throw DB::Exception("Wrong options, codec flags like --zstd and --codec options are mutually exclusive", DB::ErrorCodes::BAD_ARGUMENTS);
if (!codecs.empty() && options.count("level"))
throw DB::Exception("Wrong options, --level is not compatible with --codec list", DB::ErrorCodes::BAD_ARGUMENTS);
std::string method_family = "LZ4";
if (use_lz4hc)
@ -103,28 +109,22 @@ int mainEntryClickHouseCompressor(int argc, char ** argv)
else if (use_none)
method_family = "NONE";
std::vector<int> levels;
std::optional<int> level = std::nullopt;
if (options.count("level"))
levels = options["level"].as<std::vector<int>>();
level = options["level"].as<int>();
DB::CompressionCodecPtr codec;
if (!codecs.empty())
{
if (levels.size() > codecs.size())
throw DB::Exception("Specified more levels than codecs", DB::ErrorCodes::BAD_ARGUMENTS);
DB::ParserCodec codec_parser;
std::vector<DB::CodecNameWithLevel> codec_names;
for (size_t i = 0; i < codecs.size(); ++i)
{
if (i < levels.size())
codec_names.emplace_back(codecs[i], levels[i]);
else
codec_names.emplace_back(codecs[i], std::nullopt);
}
codec = DB::CompressionCodecFactory::instance().get(codec_names);
std::string codecs_line = boost::algorithm::join(codecs, ",");
auto ast = DB::parseQuery(codec_parser, "(" + codecs_line + ")", 0);
codec = DB::CompressionCodecFactory::instance().get(ast, nullptr);
}
else
codec = DB::CompressionCodecFactory::instance().get(method_family, levels.empty() ? std::nullopt : std::optional<int>(levels.back()));
codec = DB::CompressionCodecFactory::instance().get(method_family, level);
DB::ReadBufferFromFileDescriptor rb(STDIN_FILENO);

View File

@ -17,11 +17,11 @@ $ ./clickhouse-compressor --decompress < input_file > output_file
Compress data with ZSTD at level 5:
```
$ ./clickhouse-compressor --codec ZSTD --level 5 < input_file > output_file
$ ./clickhouse-compressor --codec 'ZSTD(5)' < input_file > output_file
```
Compress data with ZSTD level 10, LZ4HC level 7 and LZ4.
Compress data with Delta of four bytes and ZSTD level 10.
```
$ ./clickhouse-compressor --codec ZSTD --level 5 --codec LZ4HC --level 7 --codec LZ4 < input_file > output_file
$ ./clickhouse-compressor --codec 'Delta(4)' --codec 'ZSTD(10)' < input_file > output_file
```

View File

@ -1,7 +1,6 @@
#include "ClusterCopier.h"
#include <chrono>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/Logger.h>
#include <Poco/ConsoleChannel.h>
@ -13,14 +12,11 @@
#include <Poco/FileChannel.h>
#include <Poco/SplitterChannel.h>
#include <Poco/Util/HelpFormatter.h>
#include <boost/algorithm/string.hpp>
#include <pcg_random.hpp>
#include <common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <daemon/OwnPatternFormatter.h>
#include <Common/Exception.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Common/ZooKeeper/KeeperException.h>
@ -61,6 +57,7 @@
#include <DataStreams/NullBlockOutputStream.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadBufferFromFile.h>
#include <Functions/registerFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
@ -500,9 +497,6 @@ static ASTPtr extractPartitionKey(const ASTPtr & storage_ast)
ErrorCodes::BAD_ARGUMENTS);
}
ASTPtr arguments_ast = engine.arguments->clone();
ASTs & arguments = arguments_ast->children;
if (isExtendedDefinitionStorage(storage_ast))
{
if (storage.partition_by)
@ -516,6 +510,12 @@ static ASTPtr extractPartitionKey(const ASTPtr & storage_ast)
bool is_replicated = startsWith(engine.name, "Replicated");
size_t min_args = is_replicated ? 3 : 1;
if (!engine.arguments)
throw Exception("Expected arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS);
ASTPtr arguments_ast = engine.arguments->clone();
ASTs & arguments = arguments_ast->children;
if (arguments.size() < min_args)
throw Exception("Expected at least " + toString(min_args) + " arguments in " + storage_str, ErrorCodes::BAD_ARGUMENTS);
@ -894,6 +894,28 @@ public:
}
}
void uploadTaskDescription(const std::string & task_path, const std::string & task_file, const bool force)
{
auto local_task_description_path = task_path + "/description";
String task_config_str;
{
ReadBufferFromFile in(task_file);
readStringUntilEOF(task_config_str, in);
}
if (task_config_str.empty())
return;
auto zookeeper = context.getZooKeeper();
zookeeper->createAncestors(local_task_description_path);
auto code = zookeeper->tryCreate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
if (code && force)
zookeeper->createOrUpdate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
LOG_DEBUG(log, "Task description " << ((code && !force) ? "not " : "") << "uploaded to " << local_task_description_path << " with result " << code << " ("<< zookeeper->error2string(code) << ")");
}
void reloadTaskDescription()
{
auto zookeeper = context.getZooKeeper();
@ -1201,7 +1223,8 @@ protected:
auto new_columns_list = std::make_shared<ASTColumns>();
new_columns_list->set(new_columns_list->columns, new_columns);
new_columns_list->set(new_columns_list->indices, query_ast->as<ASTCreateQuery>()->columns_list->indices->clone());
if (auto indices = query_ast->as<ASTCreateQuery>()->columns_list->indices)
new_columns_list->set(new_columns_list->indices, indices->clone());
new_query.replace(new_query.columns_list, new_columns_list);
@ -2103,6 +2126,10 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
options.addOption(Poco::Util::Option("task-path", "", "path to task in ZooKeeper")
.argument("task-path").binding("task-path"));
options.addOption(Poco::Util::Option("task-file", "", "path to task file for uploading in ZooKeeper to task-path")
.argument("task-file").binding("task-file"));
options.addOption(Poco::Util::Option("task-upload-force", "", "Force upload task-file even node already exists")
.argument("task-upload-force").binding("task-upload-force"));
options.addOption(Poco::Util::Option("safe-mode", "", "disables ALTER DROP PARTITION in case of errors")
.binding("safe-mode"));
options.addOption(Poco::Util::Option("copy-fault-probability", "", "the copying fails with specified probability (used to test partition state recovering)")
@ -2153,6 +2180,11 @@ void ClusterCopierApp::mainImpl()
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, *context);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
auto task_file = config().getString("task-file", "");
if (!task_file.empty())
copier->uploadTaskDescription(task_path, task_file, config().getBool("task-upload-force", false));
copier->init();
copier->process();
}

View File

@ -369,7 +369,7 @@ void LocalServer::setupUsers()
static void showClientVersion()
{
std::cout << DBMS_NAME << " client version " << VERSION_STRING << "." << '\n';
std::cout << DBMS_NAME << " client version " << VERSION_STRING << VERSION_OFFICIAL << "." << '\n';
}
std::string LocalServer::getHelpHeader() const

View File

@ -132,7 +132,7 @@ int Server::run()
}
if (config().hasOption("version"))
{
std::cout << DBMS_NAME << " server version " << VERSION_STRING << "." << std::endl;
std::cout << DBMS_NAME << " server version " << VERSION_STRING << VERSION_OFFICIAL << "." << std::endl;
return 0;
}
return Application::run();

View File

@ -0,0 +1,85 @@
#include <AggregateFunctions/AggregateFunctionLeastSqr.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/FactoryHelpers.h>
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionLeastSqr(
const String & name,
const DataTypes & arguments,
const Array & params
)
{
assertNoParameters(name, params);
assertBinary(name, arguments);
const IDataType * x_arg = arguments.front().get();
WhichDataType which_x {
x_arg
};
const IDataType * y_arg = arguments.back().get();
WhichDataType which_y {
y_arg
};
#define FOR_LEASTSQR_TYPES_2(M, T) \
M(T, UInt8) \
M(T, UInt16) \
M(T, UInt32) \
M(T, UInt64) \
M(T, Int8) \
M(T, Int16) \
M(T, Int32) \
M(T, Int64) \
M(T, Float32) \
M(T, Float64)
#define FOR_LEASTSQR_TYPES(M) \
FOR_LEASTSQR_TYPES_2(M, UInt8) \
FOR_LEASTSQR_TYPES_2(M, UInt16) \
FOR_LEASTSQR_TYPES_2(M, UInt32) \
FOR_LEASTSQR_TYPES_2(M, UInt64) \
FOR_LEASTSQR_TYPES_2(M, Int8) \
FOR_LEASTSQR_TYPES_2(M, Int16) \
FOR_LEASTSQR_TYPES_2(M, Int32) \
FOR_LEASTSQR_TYPES_2(M, Int64) \
FOR_LEASTSQR_TYPES_2(M, Float32) \
FOR_LEASTSQR_TYPES_2(M, Float64)
#define DISPATCH(T1, T2) \
if (which_x.idx == TypeIndex::T1 && which_y.idx == TypeIndex::T2) \
return std::make_shared<AggregateFunctionLeastSqr<T1, T2>>( \
arguments, \
params \
);
FOR_LEASTSQR_TYPES(DISPATCH)
#undef FOR_LEASTSQR_TYPES_2
#undef FOR_LEASTSQR_TYPES
#undef DISPATCH
throw Exception(
"Illegal types ("
+ x_arg->getName() + ", " + y_arg->getName()
+ ") of arguments of aggregate function " + name
+ ", must be Native Ints, Native UInts or Floats",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT
);
}
}
void registerAggregateFunctionLeastSqr(AggregateFunctionFactory & factory)
{
factory.registerFunction("leastSqr", createAggregateFunctionLeastSqr);
}
}

View File

@ -0,0 +1,195 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnTuple.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <limits>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
template <typename X, typename Y, typename Ret>
struct AggregateFunctionLeastSqrData final
{
size_t count = 0;
Ret sum_x = 0;
Ret sum_y = 0;
Ret sum_xx = 0;
Ret sum_xy = 0;
void add(X x, Y y)
{
count += 1;
sum_x += x;
sum_y += y;
sum_xx += x * x;
sum_xy += x * y;
}
void merge(const AggregateFunctionLeastSqrData & other)
{
count += other.count;
sum_x += other.sum_x;
sum_y += other.sum_y;
sum_xx += other.sum_xx;
sum_xy += other.sum_xy;
}
void serialize(WriteBuffer & buf) const
{
writeBinary(count, buf);
writeBinary(sum_x, buf);
writeBinary(sum_y, buf);
writeBinary(sum_xx, buf);
writeBinary(sum_xy, buf);
}
void deserialize(ReadBuffer & buf)
{
readBinary(count, buf);
readBinary(sum_x, buf);
readBinary(sum_y, buf);
readBinary(sum_xx, buf);
readBinary(sum_xy, buf);
}
Ret getK() const
{
Ret divisor = sum_xx * count - sum_x * sum_x;
if (divisor == 0)
return std::numeric_limits<Ret>::quiet_NaN();
return (sum_xy * count - sum_x * sum_y) / divisor;
}
Ret getB(Ret k) const
{
if (count == 0)
return std::numeric_limits<Ret>::quiet_NaN();
return (sum_y - k * sum_x) / count;
}
};
/// Calculates simple linear regression parameters.
/// Result is a tuple (k, b) for y = k * x + b equation, solved by least squares approximation.
template <typename X, typename Y, typename Ret = Float64>
class AggregateFunctionLeastSqr final : public IAggregateFunctionDataHelper<
AggregateFunctionLeastSqrData<X, Y, Ret>,
AggregateFunctionLeastSqr<X, Y, Ret>
>
{
public:
AggregateFunctionLeastSqr(
const DataTypes & arguments,
const Array & params
):
IAggregateFunctionDataHelper<
AggregateFunctionLeastSqrData<X, Y, Ret>,
AggregateFunctionLeastSqr<X, Y, Ret>
> {arguments, params}
{
// notice: arguments has been checked before
}
String getName() const override
{
return "leastSqr";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
void add(
AggregateDataPtr place,
const IColumn ** columns,
size_t row_num,
Arena *
) const override
{
auto col_x {
static_cast<const ColumnVector<X> *>(columns[0])
};
auto col_y {
static_cast<const ColumnVector<Y> *>(columns[1])
};
X x = col_x->getData()[row_num];
Y y = col_y->getData()[row_num];
this->data(place).add(x, y);
}
void merge(
AggregateDataPtr place,
ConstAggregateDataPtr rhs, Arena *
) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(
ConstAggregateDataPtr place,
WriteBuffer & buf
) const override
{
this->data(place).serialize(buf);
}
void deserialize(
AggregateDataPtr place,
ReadBuffer & buf, Arena *
) const override
{
this->data(place).deserialize(buf);
}
DataTypePtr getReturnType() const override
{
DataTypes types {
std::make_shared<DataTypeNumber<Ret>>(),
std::make_shared<DataTypeNumber<Ret>>(),
};
Strings names {
"k",
"b",
};
return std::make_shared<DataTypeTuple>(
std::move(types),
std::move(names)
);
}
void insertResultInto(
ConstAggregateDataPtr place,
IColumn & to
) const override
{
Ret k = this->data(place).getK();
Ret b = this->data(place).getB(k);
auto & col_tuple = static_cast<ColumnTuple &>(to);
auto & col_k = static_cast<ColumnVector<Ret> &>(col_tuple.getColumn(0));
auto & col_b = static_cast<ColumnVector<Ret> &>(col_tuple.getColumn(1));
col_k.getData().push_back(k);
col_b.getData().push_back(b);
}
};
}

View File

@ -16,7 +16,6 @@
#include <Common/HashTable/HashSet.h>
#include <Common/HyperLogLogWithSmallSetOptimization.h>
#include <Common/CombinedCardinalityEstimator.h>
#include <Common/MemoryTracker.h>
#include <Common/typeid_cast.h>
#include <AggregateFunctions/UniquesHashSet.h>

View File

@ -29,6 +29,7 @@ void registerAggregateFunctionsBitwise(AggregateFunctionFactory &);
void registerAggregateFunctionsBitmap(AggregateFunctionFactory &);
void registerAggregateFunctionsMaxIntersections(AggregateFunctionFactory &);
void registerAggregateFunctionEntropy(AggregateFunctionFactory &);
void registerAggregateFunctionLeastSqr(AggregateFunctionFactory &);
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorArray(AggregateFunctionCombinatorFactory &);
@ -69,6 +70,7 @@ void registerAggregateFunctions()
registerAggregateFunctionHistogram(factory);
registerAggregateFunctionRetention(factory);
registerAggregateFunctionEntropy(factory);
registerAggregateFunctionLeastSqr(factory);
}
{

View File

@ -10,7 +10,7 @@ namespace DB
/** Aligned piece of memory.
* It can only be allocated and destroyed.
* MemoryTracker is not used. It is intended for small pieces of memory.
* MemoryTracker is not used. AlignedBuffer is intended for small pieces of memory.
*/
class AlignedBuffer : private boost::noncopyable
{

View File

@ -421,6 +421,7 @@ namespace ErrorCodes
extern const int UNKNOWN_PROTOBUF_FORMAT = 444;
extern const int CANNOT_MPROTECT = 445;
extern const int FUNCTION_NOT_ALLOWED = 446;
extern const int HYPERSCAN_CANNOT_SCAN_TEXT = 447;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -21,11 +21,6 @@ namespace ErrorCodes
extern const int CANNOT_TRUNCATE_FILE;
}
const char * getVersion()
{
return VERSION_STRING;
}
std::string errnoToString(int code, int e)
{
const size_t buf_size = 128;
@ -82,14 +77,15 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
}
catch (const Exception & e)
{
stream << "(version " << getVersion() << ") " << getExceptionMessage(e, with_stacktrace, check_embedded_stacktrace);
stream << getExceptionMessage(e, with_stacktrace, check_embedded_stacktrace) << " (version " << VERSION_STRING << VERSION_OFFICIAL << ")";
}
catch (const Poco::Exception & e)
{
try
{
stream << "(version " << getVersion() << ") " << "Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText();
stream << "Poco::Exception. Code: " << ErrorCodes::POCO_EXCEPTION << ", e.code() = " << e.code()
<< ", e.displayText() = " << e.displayText()
<< " (version " << VERSION_STRING << VERSION_OFFICIAL << ")";
}
catch (...) {}
}
@ -103,7 +99,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
if (status)
name += " (demangling status: " + toString(status) + ")";
stream << "(version " << getVersion() << ") " << "std::exception. Code: " << ErrorCodes::STD_EXCEPTION << ", type: " << name << ", e.what() = " << e.what();
stream << "std::exception. Code: " << ErrorCodes::STD_EXCEPTION << ", type: " << name << ", e.what() = " << e.what() << ", version = " << VERSION_STRING << VERSION_OFFICIAL;
}
catch (...) {}
}
@ -117,7 +113,7 @@ std::string getCurrentExceptionMessage(bool with_stacktrace, bool check_embedded
if (status)
name += " (demangling status: " + toString(status) + ")";
stream << "(version " << getVersion() << ") " << "Unknown exception. Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ", type: " << name;
stream << "Unknown exception. Code: " << ErrorCodes::UNKNOWN_EXCEPTION << ", type: " << name << " (version " << VERSION_STRING << VERSION_OFFICIAL << ")";
}
catch (...) {}
}

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
* template parameter is available as Creator
*/
template <typename CreatorFunc>
class IFactoryWithAliases
class IFactoryWithAliases : public IHints<2, IFactoryWithAliases<CreatorFunc>>
{
protected:
using Creator = CreatorFunc;
@ -76,7 +76,7 @@ public:
throw Exception(factory_name + ": alias name '" + alias_name + "' is not unique", ErrorCodes::LOGICAL_ERROR);
}
std::vector<String> getAllRegisteredNames() const
std::vector<String> getAllRegisteredNames() const override
{
std::vector<String> result;
auto getter = [](const auto & pair) { return pair.first; };
@ -106,13 +106,7 @@ public:
return aliases.count(name) || case_insensitive_aliases.count(name);
}
std::vector<String> getHints(const String & name) const
{
static const auto registered_names = getAllRegisteredNames();
return prompter.getHints(name, registered_names);
}
virtual ~IFactoryWithAliases() {}
virtual ~IFactoryWithAliases() override {}
private:
using InnerMap = std::unordered_map<String, Creator>; // name -> creator
@ -127,13 +121,6 @@ private:
/// Case insensitive aliases
AliasMap case_insensitive_aliases;
/**
* prompter for names, if a person makes a typo for some function or type, it
* helps to find best possible match (in particular, edit distance is done like in clang
* (max edit distance is (typo.size() + 2) / 3)
*/
NamePrompter</*MaxNumHints=*/2> prompter;
};
}

View File

@ -97,4 +97,23 @@ private:
}
};
template <size_t MaxNumHints, class Self>
class IHints
{
public:
virtual std::vector<String> getAllRegisteredNames() const = 0;
std::vector<String> getHints(const String & name) const
{
static const auto registered_names = getAllRegisteredNames();
return prompter.getHints(name, registered_names);
}
virtual ~IHints() = default;
private:
NamePrompter<MaxNumHints> prompter;
};
}

View File

@ -0,0 +1,52 @@
#pragma once
#include <vector>
//#include <Common/PODArray.h>
namespace DB
{
/**
* This class is intended to push sortable data into.
* When looking up values the container ensures that it is sorted for log(N) lookup
*
* Note, this is only efficient when the insertions happen in one stage, followed by all retrievals
* This way the data only gets sorted once.
*/
template <typename T>
class SortedLookupPODArray
{
public:
using Base = std::vector<T>;
//using Base = PaddedPODArray<T>;
template <typename U, typename ... TAllocatorParams>
void insert(U && x, TAllocatorParams &&... allocator_params)
{
array.push_back(std::forward<U>(x), std::forward<TAllocatorParams>(allocator_params)...);
sorted = false;
}
typename Base::const_iterator upper_bound(const T & k)
{
if (!sorted)
sort();
return std::upper_bound(array.cbegin(), array.cend(), k);
}
typename Base::const_iterator cbegin() const { return array.cbegin(); }
typename Base::const_iterator cend() const { return array.cend(); }
private:
Base array;
bool sorted = false;
void sort()
{
std::sort(array.begin(), array.end());
sorted = true;
}
};
}

View File

@ -156,7 +156,7 @@ public:
#endif
}
bool compare(const UInt8 * pos) const
ALWAYS_INLINE bool compare(const UInt8 * pos) const
{
static const Poco::UTF8Encoding utf8;
@ -374,7 +374,7 @@ public:
#endif
}
bool compare(const UInt8 * pos) const
ALWAYS_INLINE bool compare(const UInt8 * pos) const
{
#ifdef __SSE4_1__
if (pageSafe(pos))
@ -568,7 +568,7 @@ public:
#endif
}
bool compare(const UInt8 * pos) const
ALWAYS_INLINE bool compare(const UInt8 * pos) const
{
#ifdef __SSE4_1__
if (pageSafe(pos))

View File

@ -20,6 +20,7 @@
#cmakedefine VERSION_MINOR @VERSION_MINOR@
#cmakedefine VERSION_PATCH @VERSION_PATCH@
#cmakedefine VERSION_STRING "@VERSION_STRING@"
#cmakedefine VERSION_OFFICIAL "@VERSION_OFFICIAL@"
#cmakedefine VERSION_FULL "@VERSION_FULL@"
#cmakedefine VERSION_DESCRIBE "@VERSION_DESCRIBE@"
#cmakedefine VERSION_GITHASH "@VERSION_GITHASH@"
@ -42,3 +43,7 @@
#else
#define DBMS_VERSION_PATCH 0
#endif
#if !defined(VERSION_OFFICIAL)
# define VERSION_OFFICIAL ""
#endif

View File

@ -125,10 +125,10 @@ void CompressionCodecDelta::doDecompressData(const char * source, UInt32 source_
}
}
void registerCodecDelta(CompressionCodecFactory & factory)
namespace
{
UInt8 method_code = UInt8(CompressionMethodByte::Delta);
factory.registerCompressionCodecWithType("Delta", method_code, [&](const ASTPtr & arguments, DataTypePtr column_type) -> CompressionCodecPtr
UInt8 getDeltaBytesSize(DataTypePtr column_type)
{
UInt8 delta_bytes_size = 1;
if (column_type && column_type->haveMaximumSizeOfValue())
@ -137,7 +137,22 @@ void registerCodecDelta(CompressionCodecFactory & factory)
if (max_size == 1 || max_size == 2 || max_size == 4 || max_size == 8)
delta_bytes_size = static_cast<UInt8>(max_size);
}
return delta_bytes_size;
}
}
void CompressionCodecDelta::useInfoAboutType(DataTypePtr data_type)
{
delta_bytes_size = getDeltaBytesSize(data_type);
}
void registerCodecDelta(CompressionCodecFactory & factory)
{
UInt8 method_code = UInt8(CompressionMethodByte::Delta);
factory.registerCompressionCodecWithType("Delta", method_code, [&](const ASTPtr & arguments, DataTypePtr column_type) -> CompressionCodecPtr
{
UInt8 delta_bytes_size = getDeltaBytesSize(column_type);
if (arguments && !arguments->children.empty())
{
if (arguments->children.size() > 1)

View File

@ -1,6 +1,7 @@
#pragma once
#include <Compression/ICompressionCodec.h>
namespace DB
{
@ -13,14 +14,18 @@ public:
String getCodecDesc() const override;
void useInfoAboutType(DataTypePtr data_type) override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override { return uncompressed_size + 2; }
private:
const UInt8 delta_bytes_size;
};
}
private:
UInt8 delta_bytes_size;
};
}

View File

@ -21,16 +21,6 @@ extern const int CORRUPTED_DATA;
CompressionCodecMultiple::CompressionCodecMultiple(Codecs codecs)
: codecs(codecs)
{
std::ostringstream ss;
for (size_t idx = 0; idx < codecs.size(); idx++)
{
if (idx != 0)
ss << ',' << ' ';
const auto codec = codecs[idx];
ss << codec->getCodecDesc();
}
codec_desc = ss.str();
}
UInt8 CompressionCodecMultiple::getMethodByte() const
@ -40,7 +30,16 @@ UInt8 CompressionCodecMultiple::getMethodByte() const
String CompressionCodecMultiple::getCodecDesc() const
{
return codec_desc;
std::ostringstream ss;
for (size_t idx = 0; idx < codecs.size(); idx++)
{
if (idx != 0)
ss << ',' << ' ';
const auto codec = codecs[idx];
ss << codec->getCodecDesc();
}
return ss.str();
}
UInt32 CompressionCodecMultiple::getMaxCompressedDataSize(UInt32 uncompressed_size) const
@ -79,6 +78,14 @@ UInt32 CompressionCodecMultiple::doCompressData(const char * source, UInt32 sour
return 1 + codecs.size() + source_size;
}
void CompressionCodecMultiple::useInfoAboutType(DataTypePtr data_type)
{
for (auto & codec : codecs)
{
codec->useInfoAboutType(data_type);
}
}
void CompressionCodecMultiple::doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const
{
UInt8 compression_methods_size = source[0];

View File

@ -17,6 +17,8 @@ public:
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void useInfoAboutType(DataTypePtr data_type) override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
@ -24,7 +26,6 @@ protected:
private:
Codecs codecs;
String codec_desc;
};

View File

@ -42,17 +42,6 @@ CompressionCodecPtr CompressionCodecFactory::get(const String & family_name, std
}
}
CompressionCodecPtr CompressionCodecFactory::get(const std::vector<CodecNameWithLevel> & codecs) const
{
Codecs result;
for (const auto & [codec_name, level] : codecs)
result.push_back(get(codec_name, level));
if (result.size() == 1)
return result.back();
return std::make_shared<CompressionCodecMultiple>(result);
}
CompressionCodecPtr CompressionCodecFactory::get(const ASTPtr & ast, DataTypePtr column_type) const
{
@ -93,7 +82,7 @@ CompressionCodecPtr CompressionCodecFactory::get(const UInt8 byte_code) const
CompressionCodecPtr CompressionCodecFactory::getImpl(const String & family_name, const ASTPtr & arguments, DataTypePtr column_type) const
{
if (family_name == "Multiple")
throw Exception("Codec MULTIPLE cannot be specified directly", ErrorCodes::UNKNOWN_CODEC);
throw Exception("Codec Multiple cannot be specified directly", ErrorCodes::UNKNOWN_CODEC);
const auto family_and_creator = family_name_with_codec.find(family_name);

View File

@ -48,8 +48,6 @@ public:
/// For backward compatibility with config settings
CompressionCodecPtr get(const String & family_name, std::optional<int> level) const;
CompressionCodecPtr get(const std::vector<CodecNameWithLevel> & codecs) const;
/// Register codec with parameters and column type
void registerCompressionCodecWithType(const String & family_name, std::optional<UInt8> byte_code, CreatorWithType creator);
/// Register codec with parameters

View File

@ -58,6 +58,9 @@ public:
/// Read method byte from compressed source
static UInt8 readMethod(const char * source);
/// Some codecs may use information about column type which appears after codec creation
virtual void useInfoAboutType(DataTypePtr /* data_type */) { }
protected:
/// Return size of compressed data without header

View File

@ -23,20 +23,21 @@ namespace DB
class TaskNotification final : public Poco::Notification
{
public:
explicit TaskNotification(const BackgroundSchedulePool::TaskInfoPtr & task) : task(task) {}
explicit TaskNotification(const BackgroundSchedulePoolTaskInfoPtr & task) : task(task) {}
void execute() { task->execute(); }
private:
BackgroundSchedulePool::TaskInfoPtr task;
BackgroundSchedulePoolTaskInfoPtr task;
};
BackgroundSchedulePool::TaskInfo::TaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const TaskFunc & function_)
BackgroundSchedulePoolTaskInfo::BackgroundSchedulePoolTaskInfo(
BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_)
: pool(pool_), log_name(log_name_), function(function_)
{
}
bool BackgroundSchedulePool::TaskInfo::schedule()
bool BackgroundSchedulePoolTaskInfo::schedule()
{
std::lock_guard lock(schedule_mutex);
@ -47,7 +48,7 @@ bool BackgroundSchedulePool::TaskInfo::schedule()
return true;
}
bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms)
bool BackgroundSchedulePoolTaskInfo::scheduleAfter(size_t ms)
{
std::lock_guard lock(schedule_mutex);
@ -58,7 +59,7 @@ bool BackgroundSchedulePool::TaskInfo::scheduleAfter(size_t ms)
return true;
}
void BackgroundSchedulePool::TaskInfo::deactivate()
void BackgroundSchedulePoolTaskInfo::deactivate()
{
std::lock_guard lock_exec(exec_mutex);
std::lock_guard lock_schedule(schedule_mutex);
@ -73,13 +74,13 @@ void BackgroundSchedulePool::TaskInfo::deactivate()
pool.cancelDelayedTask(shared_from_this(), lock_schedule);
}
void BackgroundSchedulePool::TaskInfo::activate()
void BackgroundSchedulePoolTaskInfo::activate()
{
std::lock_guard lock(schedule_mutex);
deactivated = false;
}
bool BackgroundSchedulePool::TaskInfo::activateAndSchedule()
bool BackgroundSchedulePoolTaskInfo::activateAndSchedule()
{
std::lock_guard lock(schedule_mutex);
@ -91,7 +92,7 @@ bool BackgroundSchedulePool::TaskInfo::activateAndSchedule()
return true;
}
void BackgroundSchedulePool::TaskInfo::execute()
void BackgroundSchedulePoolTaskInfo::execute()
{
Stopwatch watch;
CurrentMetrics::Increment metric_increment{CurrentMetrics::BackgroundSchedulePoolTask};
@ -131,7 +132,7 @@ void BackgroundSchedulePool::TaskInfo::execute()
}
}
void BackgroundSchedulePool::TaskInfo::scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock)
void BackgroundSchedulePoolTaskInfo::scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock)
{
scheduled = true;
@ -145,7 +146,7 @@ void BackgroundSchedulePool::TaskInfo::scheduleImpl(std::lock_guard<std::mutex>
pool.queue.enqueueNotification(new TaskNotification(shared_from_this()));
}
Coordination::WatchCallback BackgroundSchedulePool::TaskInfo::getWatchCallback()
Coordination::WatchCallback BackgroundSchedulePoolTaskInfo::getWatchCallback()
{
return [t = shared_from_this()](const Coordination::WatchResponse &)
{

View File

@ -20,6 +20,8 @@ namespace DB
{
class TaskNotification;
class BackgroundSchedulePoolTaskInfo;
class BackgroundSchedulePoolTaskHolder;
/** Executes functions scheduled at a specific point in time.
@ -35,84 +37,14 @@ class TaskNotification;
class BackgroundSchedulePool
{
public:
class TaskInfo;
friend class BackgroundSchedulePoolTaskInfo;
using TaskInfo = BackgroundSchedulePoolTaskInfo;
using TaskInfoPtr = std::shared_ptr<TaskInfo>;
using TaskFunc = std::function<void()>;
using TaskHolder = BackgroundSchedulePoolTaskHolder;
using DelayedTasks = std::multimap<Poco::Timestamp, TaskInfoPtr>;
class TaskInfo : public std::enable_shared_from_this<TaskInfo>, private boost::noncopyable
{
public:
TaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const TaskFunc & function_);
/// Schedule for execution as soon as possible (if not already scheduled).
/// If the task was already scheduled with delay, the delay will be ignored.
bool schedule();
/// Schedule for execution after specified delay.
bool scheduleAfter(size_t ms);
/// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task.
void deactivate();
void activate();
/// Atomically activate task and schedule it for execution.
bool activateAndSchedule();
/// get Coordination::WatchCallback needed for notifications from ZooKeeper watches.
Coordination::WatchCallback getWatchCallback();
private:
friend class TaskNotification;
friend class BackgroundSchedulePool;
void execute();
void scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock);
BackgroundSchedulePool & pool;
std::string log_name;
TaskFunc function;
std::mutex exec_mutex;
std::mutex schedule_mutex;
/// Invariants:
/// * If deactivated is true then scheduled, delayed and executing are all false.
/// * scheduled and delayed cannot be true at the same time.
bool deactivated = false;
bool scheduled = false;
bool delayed = false;
bool executing = false;
/// If the task is scheduled with delay, points to element of delayed_tasks.
DelayedTasks::iterator iterator;
};
class TaskHolder
{
public:
TaskHolder() = default;
explicit TaskHolder(const TaskInfoPtr & task_info_) : task_info(task_info_) {}
TaskHolder(const TaskHolder & other) = delete;
TaskHolder(TaskHolder && other) noexcept = default;
TaskHolder & operator=(const TaskHolder & other) noexcept = delete;
TaskHolder & operator=(TaskHolder && other) noexcept = default;
~TaskHolder()
{
if (task_info)
task_info->deactivate();
}
TaskInfo * operator->() { return task_info.get(); }
const TaskInfo * operator->() const { return task_info.get(); }
private:
TaskInfoPtr task_info;
};
TaskHolder createTask(const std::string & log_name, const TaskFunc & function);
size_t getNumberOfThreads() const { return size; }
@ -153,4 +85,81 @@ private:
void attachToThreadGroup();
};
class BackgroundSchedulePoolTaskInfo : public std::enable_shared_from_this<BackgroundSchedulePoolTaskInfo>, private boost::noncopyable
{
public:
BackgroundSchedulePoolTaskInfo(BackgroundSchedulePool & pool_, const std::string & log_name_, const BackgroundSchedulePool::TaskFunc & function_);
/// Schedule for execution as soon as possible (if not already scheduled).
/// If the task was already scheduled with delay, the delay will be ignored.
bool schedule();
/// Schedule for execution after specified delay.
bool scheduleAfter(size_t ms);
/// Further attempts to schedule become no-op. Will wait till the end of the current execution of the task.
void deactivate();
void activate();
/// Atomically activate task and schedule it for execution.
bool activateAndSchedule();
/// get Coordination::WatchCallback needed for notifications from ZooKeeper watches.
Coordination::WatchCallback getWatchCallback();
private:
friend class TaskNotification;
friend class BackgroundSchedulePool;
void execute();
void scheduleImpl(std::lock_guard<std::mutex> & schedule_mutex_lock);
BackgroundSchedulePool & pool;
std::string log_name;
BackgroundSchedulePool::TaskFunc function;
std::mutex exec_mutex;
std::mutex schedule_mutex;
/// Invariants:
/// * If deactivated is true then scheduled, delayed and executing are all false.
/// * scheduled and delayed cannot be true at the same time.
bool deactivated = false;
bool scheduled = false;
bool delayed = false;
bool executing = false;
/// If the task is scheduled with delay, points to element of delayed_tasks.
BackgroundSchedulePool::DelayedTasks::iterator iterator;
};
using BackgroundSchedulePoolTaskInfoPtr = std::shared_ptr<BackgroundSchedulePoolTaskInfo>;
class BackgroundSchedulePoolTaskHolder
{
public:
BackgroundSchedulePoolTaskHolder() = default;
explicit BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskInfoPtr & task_info_) : task_info(task_info_) {}
BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskHolder & other) = delete;
BackgroundSchedulePoolTaskHolder(BackgroundSchedulePoolTaskHolder && other) noexcept = default;
BackgroundSchedulePoolTaskHolder & operator=(const BackgroundSchedulePoolTaskHolder & other) noexcept = delete;
BackgroundSchedulePoolTaskHolder & operator=(BackgroundSchedulePoolTaskHolder && other) noexcept = default;
~BackgroundSchedulePoolTaskHolder()
{
if (task_info)
task_info->deactivate();
}
BackgroundSchedulePoolTaskInfo * operator->() { return task_info.get(); }
const BackgroundSchedulePoolTaskInfo * operator->() const { return task_info.get(); }
private:
BackgroundSchedulePoolTaskInfoPtr task_info;
};
}

View File

@ -533,12 +533,6 @@ void SettingString::write(WriteBuffer & buf) const
}
void SettingChar::checkStringIsACharacter(const String & x) const
{
if (x.size() != 1)
throw Exception("A setting's value string has to be an exactly one character long", ErrorCodes::SIZE_OF_FIXED_STRING_DOESNT_MATCH);
}
String SettingChar::toString() const
{
return String(1, value);
@ -552,9 +546,10 @@ void SettingChar::set(char x)
void SettingChar::set(const String & x)
{
checkStringIsACharacter(x);
value = x[0];
changed = true;
if (x.size() > 1)
throw Exception("A setting's value string has to be an exactly one character long", ErrorCodes::SIZE_OF_FIXED_STRING_DOESNT_MATCH);
char c = (x.size() == 1) ? x[0] : '\0';
set(c);
}
void SettingChar::set(const Field & x)
@ -565,10 +560,9 @@ void SettingChar::set(const Field & x)
void SettingChar::set(ReadBuffer & buf)
{
String x;
readBinary(x, buf);
checkStringIsACharacter(x);
set(x);
String s;
readBinary(s, buf);
set(s);
}
void SettingChar::write(WriteBuffer & buf) const

View File

@ -335,9 +335,6 @@ struct SettingString
struct SettingChar
{
private:
void checkStringIsACharacter(const String & x) const;
public:
char value;
bool changed = false;

View File

@ -5,8 +5,6 @@
#include <DataStreams/IBlockInputStream.h>
#include <Common/CurrentMetrics.h>
#include <Common/ThreadPool.h>
#include <Common/MemoryTracker.h>
#include <Poco/Ext/ThreadNumber.h>
namespace CurrentMetrics

View File

@ -43,6 +43,9 @@ struct BlockIO
BlockIO & operator= (const BlockIO & rhs)
{
if (this == &rhs)
return *this;
out.reset();
in.reset();
process_list_entry.reset();

View File

@ -1,7 +1,6 @@
#include <future>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h>
#include <Common/CurrentThread.h>

View File

@ -8,8 +8,6 @@
#include <condition_variable>
class MemoryTracker;
namespace DB
{

View File

@ -11,7 +11,6 @@
#include <DataStreams/IBlockInputStream.h>
#include <Common/setThreadName.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentThread.h>
#include <Common/ThreadPool.h>

View File

@ -690,10 +690,9 @@ void DataTypeLowCardinality::deserializeBinaryBulkWithMultipleStreams(
};
if (!settings.continuous_reading)
{
low_cardinality_state->num_pending_rows = 0;
if (!settings.continuous_reading)
{
/// Remember in state that some granules were skipped and we need to update dictionary.
low_cardinality_state->need_update_dictionary = true;
}

View File

@ -305,7 +305,7 @@ void registerInputFormatCapnProto(FormatFactory & factory)
[](ReadBuffer & buf, const Block & sample, const Context & context, UInt64 max_block_size, const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<CapnProtoRowInputStream>(buf, sample, FormatSchemaInfo(context, "capnp")),
std::make_shared<CapnProtoRowInputStream>(buf, sample, FormatSchemaInfo(context, "CapnProto")),
sample,
max_block_size,
settings);

View File

@ -11,20 +11,29 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schema_file_extension, bool schema_required)
namespace
{
String getFormatSchemaDefaultFileExtension(const String & format)
{
if (format == "Protobuf")
return "proto";
else if (format == "CapnProto")
return "capnp";
else
return "";
}
}
FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & format)
{
String format_schema = context.getSettingsRef().format_schema.toString();
if (format_schema.empty())
{
if (schema_required)
{
throw Exception(
"Format schema requires the 'format_schema' setting to have the 'schema_file:message_name' format"
+ (schema_file_extension.empty() ? "" : ", e.g. 'schema." + schema_file_extension + ":Message'"),
ErrorCodes::BAD_ARGUMENTS);
}
return;
}
"The format " + format + " requires a schema. The 'format_schema' setting should be set", ErrorCodes::BAD_ARGUMENTS);
String default_file_extension = getFormatSchemaDefaultFileExtension(format);
size_t colon_pos = format_schema.find(':');
Poco::Path path;
@ -33,12 +42,11 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schem
{
throw Exception(
"Format schema requires the 'format_schema' setting to have the 'schema_file:message_name' format"
+ (schema_file_extension.empty() ? "" : ", e.g. 'schema." + schema_file_extension + ":Message'") + ". Got '" + format_schema
+ (default_file_extension.empty() ? "" : ", e.g. 'schema." + default_file_extension + ":Message'") + ". Got '" + format_schema
+ "'",
ErrorCodes::BAD_ARGUMENTS);
}
is_null = false;
message_name = format_schema.substr(colon_pos + 1);
auto default_schema_directory = [&context]()
@ -51,8 +59,8 @@ FormatSchemaInfo::FormatSchemaInfo(const Context & context, const String & schem
return context.hasGlobalContext() && (context.getGlobalContext().getApplicationType() == Context::ApplicationType::SERVER);
};
if (path.getExtension().empty() && !schema_file_extension.empty())
path.setExtension(schema_file_extension);
if (path.getExtension().empty() && !default_file_extension.empty())
path.setExtension(default_file_extension);
if (path.isAbsolute())
{

View File

@ -10,10 +10,7 @@ class Context;
class FormatSchemaInfo
{
public:
FormatSchemaInfo() = default;
FormatSchemaInfo(const Context & context, const String & schema_file_extension = String(), bool schema_required = true);
bool isNull() const { return is_null; }
FormatSchemaInfo(const Context & context, const String & format);
/// Returns path to the schema file.
const String & schemaPath() const { return schema_path; }
@ -26,7 +23,6 @@ public:
const String & messageName() const { return message_name; }
private:
bool is_null = true;
String schema_path;
String schema_directory;
String message_name;

View File

@ -75,7 +75,7 @@ void registerInputFormatProtobuf(FormatFactory & factory)
const FormatSettings & settings)
{
return std::make_shared<BlockInputStreamFromRowInputStream>(
std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "proto")),
std::make_shared<ProtobufRowInputStream>(buf, sample, FormatSchemaInfo(context, "Protobuf")),
sample, max_block_size, settings);
});
}

View File

@ -38,7 +38,7 @@ void registerOutputFormatProtobuf(FormatFactory & factory)
"Protobuf", [](WriteBuffer & buf, const Block & header, const Context & context, const FormatSettings &)
{
return std::make_shared<BlockOutputStreamFromRowOutputStream>(
std::make_shared<ProtobufRowOutputStream>(buf, header, FormatSchemaInfo(context, "proto")), header);
std::make_shared<ProtobufRowOutputStream>(buf, header, FormatSchemaInfo(context, "Protobuf")), header);
});
}

View File

@ -119,6 +119,8 @@ struct IntegerRoundingComputation
return x;
}
}
__builtin_unreachable();
}
static ALWAYS_INLINE T compute(T x, T scale)
@ -132,6 +134,8 @@ struct IntegerRoundingComputation
case ScaleMode::Negative:
return computeImpl(x, scale);
}
__builtin_unreachable();
}
static ALWAYS_INLINE void compute(const T * __restrict in, size_t scale, T * __restrict out)

View File

@ -40,6 +40,7 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
extern const int TOO_MANY_BYTES;
extern const int NOT_IMPLEMENTED;
extern const int HYPERSCAN_CANNOT_SCAN_TEXT;
}
/// Is the LIKE expression reduced to finding a substring in a string?
@ -289,10 +290,10 @@ struct MultiMatchAnyImpl
#if USE_HYPERSCAN
const auto & hyperscan_regex = MultiRegexps::get<FindAnyIndex, MultiSearchDistance>(needles, edit_distance);
hs_scratch_t * scratch = nullptr;
hs_error_t err = hs_alloc_scratch(hyperscan_regex->get(), &scratch);
hs_error_t err = hs_clone_scratch(hyperscan_regex->getScratch(), &scratch);
if (err != HS_SUCCESS)
throw Exception("Could not allocate scratch space for hyperscan", ErrorCodes::CANNOT_ALLOCATE_MEMORY);
throw Exception("Could not clone scratch space for hyperscan", ErrorCodes::CANNOT_ALLOCATE_MEMORY);
MultiRegexps::ScratchPtr smart_scratch(scratch);
@ -316,14 +317,16 @@ struct MultiMatchAnyImpl
if (length > std::numeric_limits<UInt32>::max())
throw Exception("Too long string to search", ErrorCodes::TOO_MANY_BYTES);
res[i] = 0;
hs_scan(
hyperscan_regex->get(),
err = hs_scan(
hyperscan_regex->getDB(),
reinterpret_cast<const char *>(haystack_data.data()) + offset,
length,
0,
smart_scratch.get(),
on_match,
&res[i]);
if (err != HS_SUCCESS)
throw Exception("Failed to scan with hyperscan", ErrorCodes::HYPERSCAN_CANNOT_SCAN_TEXT);
offset = haystack_offsets[i];
}
#else

View File

@ -173,10 +173,7 @@ struct PositionImpl
/// We check that the entry does not pass through the boundaries of strings.
if (pos + needle.size() < begin + offsets[i])
{
size_t prev_offset = i != 0 ? offsets[i - 1] : 0;
res[i] = 1 + Impl::countChars(reinterpret_cast<const char *>(begin + prev_offset), reinterpret_cast<const char *>(pos));
}
res[i] = 1 + Impl::countChars(reinterpret_cast<const char *>(begin + offsets[i - 1]), reinterpret_cast<const char *>(pos));
else
res[i] = 0;
@ -306,7 +303,8 @@ struct MultiSearchAllPositionsImpl
const std::vector<StringRef> & needles,
PaddedPODArray<UInt64> & res)
{
auto res_callback = [](const UInt8 * start, const UInt8 * end) -> UInt64 {
auto res_callback = [](const UInt8 * start, const UInt8 * end) -> UInt64
{
return 1 + Impl::countChars(reinterpret_cast<const char *>(start), reinterpret_cast<const char *>(end));
};
Impl::createMultiSearcherInBigHaystack(needles).searchAllPositions(haystack_data, haystack_offsets, res_callback, res);
@ -341,7 +339,8 @@ struct MultiSearchFirstPositionImpl
const std::vector<StringRef> & needles,
PaddedPODArray<UInt64> & res)
{
auto res_callback = [](const UInt8 * start, const UInt8 * end) -> UInt64 {
auto res_callback = [](const UInt8 * start, const UInt8 * end) -> UInt64
{
return 1 + Impl::countChars(reinterpret_cast<const char *>(start), reinterpret_cast<const char *>(end));
};
Impl::createMultiSearcherInBigHaystack(needles).searchFirstPosition(haystack_data, haystack_offsets, res_callback, res);

View File

@ -1,6 +1,8 @@
#pragma once
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <utility>
@ -11,6 +13,7 @@
#include <Common/ProfileEvents.h>
#include <common/StringRef.h>
#include <Common/config.h>
#if USE_HYPERSCAN
# if __has_include(<hs/hs.h>)
@ -84,24 +87,32 @@ namespace MultiRegexps
};
using CompilerError = std::unique_ptr<hs_compile_error_t, HyperscanDeleter<decltype(&hs_free_compile_error), &hs_free_compile_error>>;
using ScratchPtr = std::unique_ptr<hs_scratch_t, DB::MultiRegexps::HyperscanDeleter<decltype(&hs_free_scratch), &hs_free_scratch>>;
using Regexps = std::unique_ptr<hs_database_t, HyperscanDeleter<decltype(&hs_free_database), &hs_free_database>>;
using ScratchPtr = std::unique_ptr<hs_scratch_t, HyperscanDeleter<decltype(&hs_free_scratch), &hs_free_scratch>>;
using DataBasePtr = std::unique_ptr<hs_database_t, HyperscanDeleter<decltype(&hs_free_database), &hs_free_database>>;
using Pool = ObjectPoolMap<Regexps, std::pair<std::vector<String>, std::optional<UInt32>>>;
/// If CompileForEditDistance is False, edit_distance must be nullopt
template <bool FindAnyIndex, bool CompileForEditDistance>
inline Pool::Pointer get(const std::vector<StringRef> & patterns, std::optional<UInt32> edit_distance)
/// Database is thread safe across multiple threads and Scratch is not but we can copy it whenever we use it in the searcher
class Regexps
{
/// C++11 has thread-safe function-local statics on most modern compilers.
static Pool known_regexps; /// Different variables for different pattern parameters.
public:
Regexps(hs_database_t * db_, hs_scratch_t * scratch_) : db{db_}, scratch{scratch_} {}
std::vector<String> str_patterns;
str_patterns.reserve(patterns.size());
for (const StringRef & ref : patterns)
str_patterns.push_back(ref.toString());
hs_database_t * getDB() const { return db.get(); }
hs_scratch_t * getScratch() const { return scratch.get(); }
private:
DataBasePtr db;
ScratchPtr scratch;
};
return known_regexps.get({str_patterns, edit_distance}, [&str_patterns, edit_distance]
struct Pool
{
/// Mutex for finding in map
std::mutex mutex;
/// Patterns + possible edit_distance to database and scratch
std::map<std::pair<std::vector<String>, std::optional<UInt32>>, Regexps> storage;
};
template <bool FindAnyIndex, bool CompileForEditDistance>
inline Regexps constructRegexps(const std::vector<String> & str_patterns, std::optional<UInt32> edit_distance)
{
(void)edit_distance;
/// Common pointers
@ -183,8 +194,39 @@ namespace MultiRegexps
ProfileEvents::increment(ProfileEvents::RegexpCreated);
return new Regexps{db};
});
hs_scratch_t * scratch = nullptr;
err = hs_alloc_scratch(db, &scratch);
if (err != HS_SUCCESS)
throw Exception("Could not allocate scratch space for hyperscan", ErrorCodes::CANNOT_ALLOCATE_MEMORY);
return Regexps{db, scratch};
}
/// If CompileForEditDistance is False, edit_distance must be nullopt
template <bool FindAnyIndex, bool CompileForEditDistance>
inline Regexps * get(const std::vector<StringRef> & patterns, std::optional<UInt32> edit_distance)
{
/// C++11 has thread-safe function-local statics on most modern compilers.
static Pool known_regexps; /// Different variables for different pattern parameters.
std::vector<String> str_patterns;
str_patterns.reserve(patterns.size());
for (const StringRef & ref : patterns)
str_patterns.push_back(ref.toString());
std::unique_lock lock(known_regexps.mutex);
auto it = known_regexps.storage.find({str_patterns, edit_distance});
if (known_regexps.storage.end() == it)
it = known_regexps.storage.emplace(
std::pair{str_patterns, edit_distance},
constructRegexps<FindAnyIndex, CompileForEditDistance>(str_patterns, edit_distance)).first;
lock.unlock();
return &it->second;
}
}

View File

@ -56,6 +56,7 @@ private:
struct UnpackedArrays
{
size_t base_rows = 0;
std::vector<char> is_const;
std::vector<const NullMap *> null_maps;
std::vector<const ColumnArray::ColumnOffsets::Container *> offsets;
@ -246,6 +247,8 @@ FunctionArrayIntersect::UnpackedArrays FunctionArrayIntersect::prepareArrays(con
arrays.offsets.resize(columns_number);
arrays.nested_columns.resize(columns_number);
bool all_const = true;
for (auto i : ext::range(0, columns_number))
{
auto argument_column = columns[i].get();
@ -257,6 +260,9 @@ FunctionArrayIntersect::UnpackedArrays FunctionArrayIntersect::prepareArrays(con
if (auto argument_column_array = typeid_cast<const ColumnArray *>(argument_column))
{
if (!arrays.is_const[i])
all_const = false;
arrays.offsets[i] = &argument_column_array->getOffsets();
arrays.nested_columns[i] = &argument_column_array->getData();
if (auto column_nullable = typeid_cast<const ColumnNullable *>(arrays.nested_columns[i]))
@ -269,6 +275,25 @@ FunctionArrayIntersect::UnpackedArrays FunctionArrayIntersect::prepareArrays(con
throw Exception{"Arguments for function " + getName() + " must be arrays.", ErrorCodes::LOGICAL_ERROR};
}
if (all_const)
{
arrays.base_rows = arrays.offsets.front()->size();
}
else
{
for (auto i : ext::range(0, columns_number))
{
if (arrays.is_const[i])
continue;
size_t rows = arrays.offsets[i]->size();
if (arrays.base_rows == 0 && rows > 0)
arrays.base_rows = rows;
else if (arrays.base_rows != rows)
throw Exception("Non-const array columns in function " + getName() + "should have same rows", ErrorCodes::LOGICAL_ERROR);
}
}
return arrays;
}
@ -277,7 +302,7 @@ void FunctionArrayIntersect::executeImpl(Block & block, const ColumnNumbers & ar
const auto & return_type = block.getByPosition(result).type;
auto return_type_array = checkAndGetDataType<DataTypeArray>(return_type.get());
if (!return_type)
if (!return_type_array)
throw Exception{"Return type for function " + getName() + " must be array.", ErrorCodes::LOGICAL_ERROR};
const auto & nested_return_type = return_type_array->getNestedType();
@ -352,7 +377,7 @@ template <typename Map, typename ColumnType, bool is_numeric_column>
ColumnPtr FunctionArrayIntersect::execute(const UnpackedArrays & arrays, MutableColumnPtr result_data_ptr)
{
auto args = arrays.nested_columns.size();
auto rows = arrays.offsets.front()->size();
auto rows = arrays.base_rows;
bool all_nullable = true;
@ -392,26 +417,42 @@ ColumnPtr FunctionArrayIntersect::execute(const UnpackedArrays & arrays, Mutable
for (auto arg : ext::range(0, args))
{
bool current_has_nullable = false;
size_t off = (*arrays.offsets[arg])[row];
size_t off;
// const array has only one row
bool const_arg = arrays.is_const[arg];
if (const_arg)
off = (*arrays.offsets[arg])[0];
else
off = (*arrays.offsets[arg])[row];
for (auto i : ext::range(prev_off[arg], off))
{
if (arrays.null_maps[arg] && (*arrays.null_maps[arg])[i])
current_has_nullable = true;
else
{
typename Map::mapped_type * value = nullptr;
if constexpr (is_numeric_column)
++map[columns[arg]->getElement(i)];
value = &map[columns[arg]->getElement(i)];
else if constexpr (std::is_same<ColumnType, ColumnString>::value || std::is_same<ColumnType, ColumnFixedString>::value)
++map[columns[arg]->getDataAt(i)];
value = &map[columns[arg]->getDataAt(i)];
else
{
const char * data = nullptr;
++map[columns[arg]->serializeValueIntoArena(i, arena, data)];
value = &map[columns[arg]->serializeValueIntoArena(i, arena, data)];
}
if (*value == arg)
++(*value);
}
}
prev_off[arg] = off;
if (const_arg)
prev_off[arg] = 0;
if (!current_has_nullable)
all_has_nullable = false;
}

View File

@ -17,15 +17,15 @@ struct ExtractQueryStringAndFragment
res_data = data;
res_size = 0;
Pos pos = data;
Pos end = pos + size;
Pos end = data + size;
Pos pos;
if (end != (pos = find_first_symbols<'?'>(pos, end)))
if (end != (pos = find_first_symbols<'?'>(data, end)))
{
res_data = pos + (without_leading_char ? 1 : 0);
res_size = end - res_data;
}
else if (end != (pos = find_first_symbols<'#'>(pos, end)))
else if (end != (pos = find_first_symbols<'#'>(data, end)))
{
res_data = pos;
res_size = end - res_data;

View File

@ -36,7 +36,7 @@ protected:
return false;
BufferBase::set(buffer->position(), buffer->available(), 0);
put_delimiter = true;
put_delimiter = (delimiter != 0);
}
return true;

View File

@ -187,6 +187,9 @@ off_t ReadBufferAIO::doSeek(off_t off, int whence)
pos = working_buffer.end();
first_unread_pos_in_file = new_pos_in_file;
/// If we go back, than it's not eof
is_eof = false;
/// We can not use the result of the current asynchronous request.
skip();
}

View File

@ -43,6 +43,7 @@ protected:
ProfileCallback profile_callback;
clockid_t clock_type;
/// Children implementation should be able to seek backwards
virtual off_t doSeek(off_t off, int whence) = 0;
};

View File

@ -0,0 +1,71 @@
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
#include <Core/Defines.h>
#include <port/unistd.h>
#include <IO/ReadBufferAIO.h>
#include <fstream>
namespace
{
std::string createTmpFileForEOFtest()
{
char pattern[] = "/tmp/fileXXXXXX";
char * dir = ::mkdtemp(pattern);
return std::string(dir) + "/foo";
}
void prepare_for_eof(std::string & filename, std::string & buf)
{
static const std::string symbols = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
filename = createTmpFileForEOFtest();
size_t n = 10 * DEFAULT_AIO_FILE_BLOCK_SIZE;
buf.reserve(n);
for (size_t i = 0; i < n; ++i)
buf += symbols[i % symbols.length()];
std::ofstream out(filename.c_str());
out << buf;
}
}
TEST(ReadBufferAIOTest, TestReadAfterAIO)
{
using namespace DB;
std::string data;
std::string file_path;
prepare_for_eof(file_path, data);
ReadBufferAIO testbuf(file_path);
std::string newdata;
newdata.resize(data.length());
size_t total_read = testbuf.read(newdata.data(), newdata.length());
EXPECT_EQ(total_read, data.length());
EXPECT_TRUE(testbuf.eof());
testbuf.seek(data.length() - 100);
std::string smalldata;
smalldata.resize(100);
size_t read_after_eof = testbuf.read(smalldata.data(), smalldata.size());
EXPECT_EQ(read_after_eof, 100);
EXPECT_TRUE(testbuf.eof());
testbuf.seek(0);
std::string repeatdata;
repeatdata.resize(data.length());
size_t read_after_eof_big = testbuf.read(repeatdata.data(), repeatdata.size());
EXPECT_EQ(read_after_eof_big, data.length());
EXPECT_TRUE(testbuf.eof());
}

View File

@ -1,7 +1,7 @@
#include "DNSCacheUpdater.h"
#include <Common/DNSResolver.h>
#include <Interpreters/Context.h>
#include <Storages/MergeTree/BackgroundProcessingPool.h>
#include <Core/BackgroundSchedulePool.h>
#include <Common/ProfileEvents.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>
@ -16,8 +16,6 @@ namespace ProfileEvents
namespace DB
{
using BackgroundProcessingPoolTaskInfo = BackgroundProcessingPool::TaskInfo;
namespace ErrorCodes
{
extern const int TIMEOUT_EXCEEDED;
@ -56,18 +54,15 @@ static bool isNetworkError()
DNSCacheUpdater::DNSCacheUpdater(Context & context_)
: context(context_), pool(context_.getBackgroundPool())
: context(context_), pool(context_.getSchedulePool())
{
task_handle = pool.addTask([this] () { return run(); });
task_handle = pool.createTask("DNSCacheUpdater", [this]{ run(); });
}
BackgroundProcessingPoolTaskResult DNSCacheUpdater::run()
void DNSCacheUpdater::run()
{
/// TODO: Ensusre that we get global counter (not thread local)
auto num_current_network_exceptions = ProfileEvents::global_counters[ProfileEvents::NetworkErrors].load(std::memory_order_relaxed);
if (num_current_network_exceptions >= last_num_network_erros + min_errors_to_update_cache
&& time(nullptr) > last_update_time + min_update_period_seconds)
if (num_current_network_exceptions >= last_num_network_erros + min_errors_to_update_cache)
{
try
{
@ -77,32 +72,18 @@ BackgroundProcessingPoolTaskResult DNSCacheUpdater::run()
context.reloadClusterConfig();
last_num_network_erros = num_current_network_exceptions;
last_update_time = time(nullptr);
return BackgroundProcessingPoolTaskResult::SUCCESS;
task_handle->scheduleAfter(min_update_period_seconds * 1000);
return;
}
catch (...)
{
/// Do not increment ProfileEvents::NetworkErrors twice
if (isNetworkError())
return BackgroundProcessingPoolTaskResult::ERROR;
throw;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
/// According to BackgroundProcessingPool logic, if task has done work, it could be executed again immediately.
return BackgroundProcessingPoolTaskResult::NOTHING_TO_DO;
task_handle->scheduleAfter(10 * 1000);
}
DNSCacheUpdater::~DNSCacheUpdater()
{
if (task_handle)
pool.removeTask(task_handle);
task_handle.reset();
}
bool DNSCacheUpdater::incrementNetworkErrorEventsIfNeeded()
{
if (isNetworkError())

View File

@ -4,35 +4,31 @@
#include <ctime>
#include <cstddef>
#include <Core/BackgroundSchedulePool.h>
namespace DB
{
class Context;
class BackgroundProcessingPool;
class BackgroundProcessingPoolTaskInfo;
enum class BackgroundProcessingPoolTaskResult;
/// Add a task to BackgroundProcessingPool that watch for ProfileEvents::NetworkErrors and updates DNS cache if it has increased
class DNSCacheUpdater
{
public:
explicit DNSCacheUpdater(Context & context);
~DNSCacheUpdater();
/// Checks if it is a network error and increments ProfileEvents::NetworkErrors
static bool incrementNetworkErrorEventsIfNeeded();
private:
BackgroundProcessingPoolTaskResult run();
void run();
Context & context;
BackgroundProcessingPool & pool;
std::shared_ptr<BackgroundProcessingPoolTaskInfo> task_handle;
BackgroundSchedulePool & pool;
BackgroundSchedulePoolTaskHolder task_handle;
size_t last_num_network_erros = 0;
time_t last_update_time = 0;
static constexpr size_t min_errors_to_update_cache = 3;
static constexpr time_t min_update_period_seconds = 45;

View File

@ -19,6 +19,7 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/IColumn.h>
@ -406,7 +407,7 @@ void ExpressionAnalyzer::getAggregates(const ASTPtr & ast, ExpressionActionsPtr
getRootActions(arguments[i], true, actions);
const std::string & name = arguments[i]->getColumnName();
types[i] = actions->getSampleBlock().getByName(name).type;
types[i] = recursiveRemoveLowCardinality(actions->getSampleBlock().getByName(name).type);
aggregate.argument_names[i] = name;
}
@ -974,19 +975,11 @@ void ExpressionAnalyzer::collectUsedColumns()
RequiredSourceColumnsVisitor::Data columns_context;
RequiredSourceColumnsVisitor(columns_context).visit(query);
NameSet required = columns_context.requiredColumns();
NameSet source_column_names;
for (const auto & column : source_columns)
source_column_names.insert(column.name);
#if 0
std::cerr << "Query: " << query << std::endl;
std::cerr << "CTX: " << columns_context << std::endl;
std::cerr << "source_columns: ";
for (const auto & name : source_columns)
std::cerr << "'" << name.name << "' ";
std::cerr << "required: ";
for (const auto & pr : required)
std::cerr << "'" << pr.first << "' ";
std::cerr << std::endl;
#endif
NameSet required = columns_context.requiredColumns();
if (columns_context.has_table_join)
{
@ -1013,10 +1006,10 @@ void ExpressionAnalyzer::collectUsedColumns()
}
}
NameSet array_join_sources;
if (columns_context.has_array_join)
{
/// Insert the columns required for the ARRAY JOIN calculation into the required columns list.
NameSet array_join_sources;
for (const auto & result_source : syntax->array_join_result_to_source)
array_join_sources.insert(result_source.second);
@ -1063,15 +1056,39 @@ void ExpressionAnalyzer::collectUsedColumns()
if (!unknown_required_source_columns.empty())
{
std::stringstream ss;
ss << "query: '" << query << "' ";
ss << columns_context;
ss << "source_columns: ";
for (const auto & name : source_columns)
ss << "'" << name.name << "' ";
ss << "Missing columns:";
for (const auto & name : unknown_required_source_columns)
ss << " '" << name << "'";
ss << " while processing query: '" << query << "'";
throw Exception("Unknown identifier: " + *unknown_required_source_columns.begin()
+ (select_query && !select_query->tables ? ". Note that there are no tables (FROM clause) in your query" : "")
+ ", context: " + ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER);
ss << ", required columns:";
for (const auto & name : columns_context.requiredColumns())
ss << " '" << name << "'";
if (!source_column_names.empty())
{
ss << ", source columns:";
for (const auto & name : source_column_names)
ss << " '" << name << "'";
}
else
ss << ", no source columns";
if (columns_context.has_table_join)
{
ss << ", joined columns:";
for (const auto & column : analyzedJoin().available_joined_columns)
ss << " '" << column.name_and_type.name << "'";
}
if (!array_join_sources.empty())
{
ss << ", arrayJoin columns:";
for (const auto & name : array_join_sources)
ss << " '" << name << "'";
}
throw Exception(ss.str(), ErrorCodes::UNKNOWN_IDENTIFIER);
}
}

View File

@ -8,7 +8,6 @@
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnVector.h>
#include <Common/LRUCache.h>
#include <Common/MemoryTracker.h>
#include <Common/typeid_cast.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>

View File

@ -203,7 +203,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
if (settings.allow_experimental_multiple_joins_emulation)
{
JoinToSubqueryTransformVisitor::Data join_to_subs_data;
JoinToSubqueryTransformVisitor::Data join_to_subs_data{context};
JoinToSubqueryTransformVisitor(join_to_subs_data).visit(query_ptr);
}

View File

@ -298,12 +298,17 @@ void Join::setSampleBlock(const Block & block)
if (kind != ASTTableJoin::Kind::Left and kind != ASTTableJoin::Kind::Inner)
throw Exception("ASOF only supports LEFT and INNER as base joins", ErrorCodes::NOT_IMPLEMENTED);
if (key_columns.back()->sizeOfValueIfFixed() != sizeof(ASOFTimeType))
const IColumn * asof_column = key_columns.back();
size_t asof_size;
asof_type = AsofRowRefs::getTypeSize(asof_column, asof_size);
if (!asof_type)
{
std::string msg = "ASOF join column needs to have size ";
msg += std::to_string(sizeof(ASOFTimeType));
std::string msg = "ASOF join not supported for type";
msg += asof_column->getFamilyName();
throw Exception(msg, ErrorCodes::BAD_TYPE_OF_FIELD);
}
key_columns.pop_back();
if (key_columns.empty())
@ -314,7 +319,7 @@ void Join::setSampleBlock(const Block & block)
/// Therefore, add it back in such that it can be extracted appropriately from the full stored
/// key_columns and key_sizes
init(chooseMethod(key_columns, key_sizes));
key_sizes.push_back(sizeof(ASOFTimeType));
key_sizes.push_back(asof_size);
}
else
{
@ -325,6 +330,9 @@ void Join::setSampleBlock(const Block & block)
sample_block_with_columns_to_add = materializeBlock(block);
blocklist_sample = Block(block.getColumnsWithTypeAndName());
prepareBlockListStructure(blocklist_sample);
/// Move from `sample_block_with_columns_to_add` key columns to `sample_block_with_keys`, keeping the order.
size_t pos = 0;
while (pos < sample_block_with_columns_to_add.columns())
@ -357,47 +365,19 @@ void Join::setSampleBlock(const Block & block)
convertColumnToNullable(sample_block_with_columns_to_add.getByPosition(i));
}
void Join::TSRowRef::insert(Join::ASOFTimeType t, const Block * block, size_t row_num)
{
ts.insert(std::pair(t, RowRef(block, row_num)));
}
std::string Join::TSRowRef::dumpStructure() const
{
std::stringstream ss;
for (auto const& x : ts)
{
ss << "(t=" << x.first << " row_num=" << x.second.row_num << " ptr=" << x.second.block << "),";
}
return ss.str();
}
size_t Join::TSRowRef::size() const
{
return ts.size();
}
std::optional<std::pair<Join::ASOFTimeType, Join::RowRef>> Join::TSRowRef::findAsof(Join::ASOFTimeType t) const
{
auto it = ts.upper_bound(t);
if (it == ts.cbegin())
return {};
return *(--it);
}
namespace
{
/// Inserting an element into a hash table of the form `key -> reference to a string`, which will then be used by JOIN.
template <ASTTableJoin::Strictness STRICTNESS, typename Map, typename KeyGetter>
struct Inserter
{
static void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool);
static void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool);
};
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::Any, Map, KeyGetter>
{
static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
static ALWAYS_INLINE void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
@ -409,7 +389,7 @@ namespace
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::All, Map, KeyGetter>
{
static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
static ALWAYS_INLINE void insert(const Join &, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
@ -435,26 +415,22 @@ namespace
template <typename Map, typename KeyGetter>
struct Inserter<ASTTableJoin::Strictness::Asof, Map, KeyGetter>
{
template<typename AsofGetter>
static ALWAYS_INLINE void insert(Map & map, KeyGetter & key_getter, AsofGetter & asof_getter, Block * stored_block, size_t i, Arena & pool)
static ALWAYS_INLINE void insert(Join & join, Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool,
const IColumn * asof_column)
{
auto emplace_result = key_getter.emplaceKey(map, i, pool);
typename Map::mapped_type * time_series_map = &emplace_result.getMapped();
if (emplace_result.isInserted())
{
time_series_map = new (time_series_map) typename Map::mapped_type();
}
auto k = asof_getter.getKey(i, pool);
time_series_map->insert(k, stored_block, i);
// std::cout << "inserted key into time series map=" << k << " result=" << time_series_map->dumpStructure() << std::endl;
time_series_map->insert(join.getAsofType(), join.getAsofData(), asof_column, stored_block, i);
}
};
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool has_null_map>
void NO_INLINE insertFromBlockImplTypeCase(
Map & map, size_t rows, const ColumnRawPtrs & key_columns,
Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
const IColumn * asof_column [[maybe_unused]] = nullptr;
@ -469,30 +445,28 @@ namespace
continue;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
auto asof_getter = Join::AsofGetterType(asof_column);
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, asof_getter, stored_block, i, pool);
} else
Inserter<STRICTNESS, Map, KeyGetter>::insert(map, key_getter, stored_block, i, pool);
Inserter<STRICTNESS, Map, KeyGetter>::insert(join, map, key_getter, stored_block, i, pool, asof_column);
else
Inserter<STRICTNESS, Map, KeyGetter>::insert(join, map, key_getter, stored_block, i, pool);
}
}
template <ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
void insertFromBlockImplType(
Map & map, size_t rows, const ColumnRawPtrs & key_columns,
Join & join, Map & map, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
if (null_map)
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(map, rows, key_columns, key_sizes, stored_block, null_map, pool);
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, true>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
else
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(map, rows, key_columns, key_sizes, stored_block, null_map, pool);
insertFromBlockImplTypeCase<STRICTNESS, KeyGetter, Map, false>(join, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
}
template <ASTTableJoin::Strictness STRICTNESS, typename Maps>
void insertFromBlockImpl(
Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
Join & join, Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns,
const Sizes & key_sizes, Block * stored_block, ConstNullMapPtr null_map, Arena & pool)
{
switch (type)
@ -503,7 +477,7 @@ namespace
#define M(TYPE) \
case Join::Type::TYPE: \
insertFromBlockImplType<STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, std::remove_reference_t<decltype(*maps.TYPE)>>::Type>(\
*maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \
join, *maps.TYPE, rows, key_columns, key_sizes, stored_block, null_map, pool); \
break;
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
@ -511,10 +485,47 @@ namespace
}
}
void Join::prepareBlockListStructure(Block & stored_block)
{
if (isRightOrFull(kind))
{
/** Move the key columns to the beginning of the block.
* This is where NonJoinedBlockInputStream will expect.
*/
size_t key_num = 0;
for (const auto & name : key_names_right)
{
size_t pos = stored_block.getPositionByName(name);
ColumnWithTypeAndName col = stored_block.safeGetByPosition(pos);
stored_block.erase(pos);
stored_block.insert(key_num, std::move(col));
++key_num;
}
}
else
{
NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section
/// Remove the key columns from stored_block, as they are not needed.
/// However, do not erase the ASOF column if this is an asof join
for (const auto & name : key_names_right)
{
if (strictness == ASTTableJoin::Strictness::Asof && name == key_names_right.back())
{
LOG_DEBUG(log, "preventing removal of ASOF join column with name=" << name);
break; // this is the last column so break is OK
}
if (!erased.count(name))
stored_block.erase(stored_block.getPositionByName(name));
erased.insert(name);
}
}
}
bool Join::insertFromBlock(const Block & block)
{
std::unique_lock lock(rwlock);
LOG_DEBUG(log, "joinBlock: " << block.dumpStructure());
if (empty())
throw Exception("Logical error: Join was not initialized", ErrorCodes::LOGICAL_ERROR);
@ -543,33 +554,9 @@ bool Join::insertFromBlock(const Block & block)
blocks.push_back(block);
Block * stored_block = &blocks.back();
if (isRightOrFull(kind))
{
/** Move the key columns to the beginning of the block.
* This is where NonJoinedBlockInputStream will expect.
*/
size_t key_num = 0;
for (const auto & name : key_names_right)
{
size_t pos = stored_block->getPositionByName(name);
ColumnWithTypeAndName col = stored_block->safeGetByPosition(pos);
stored_block->erase(pos);
stored_block->insert(key_num, std::move(col));
++key_num;
}
}
else
{
NameSet erased; /// HOTFIX: there could be duplicates in JOIN ON section
prepareBlockListStructure(*stored_block);
/// Remove the key columns from stored_block, as they are not needed.
for (const auto & name : key_names_right)
{
if (!erased.count(name))
stored_block->erase(stored_block->getPositionByName(name));
erased.insert(name);
}
}
LOG_DEBUG(log, "insertFromBlock stored_block=" << stored_block->dumpStructure());
size_t size = stored_block->columns();
@ -590,7 +577,7 @@ bool Join::insertFromBlock(const Block & block)
{
dispatch([&](auto, auto strictness_, auto & map)
{
insertFromBlockImpl<strictness_>(type, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
insertFromBlockImpl<strictness_>(*this, type, map, rows, key_columns, key_sizes, stored_block, null_map, pool);
});
}
@ -608,7 +595,9 @@ public:
AddedColumns(const Block & sample_block_with_columns_to_add,
const Block & block_with_columns_to_add,
const Block & block, size_t num_columns_to_skip)
const Block & block,
const Block & blocklist_sample,
const ColumnsWithTypeAndName & extras)
{
size_t num_columns_to_add = sample_block_with_columns_to_add.columns();
@ -622,8 +611,14 @@ public:
/// Don't insert column if it's in left block or not explicitly required.
if (!block.has(src_column.name) && block_with_columns_to_add.has(src_column.name))
addColumn(src_column, num_columns_to_skip + i);
addColumn(src_column);
}
for (auto & extra : extras)
addColumn(extra);
for (auto & tn : type_name)
right_indexes.push_back(blocklist_sample.getPositionByName(tn.second));
}
size_t size() const { return columns.size(); }
@ -651,12 +646,11 @@ private:
MutableColumns columns;
std::vector<size_t> right_indexes;
void addColumn(const ColumnWithTypeAndName & src_column, size_t idx)
void addColumn(const ColumnWithTypeAndName & src_column)
{
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name);
right_indexes.push_back(idx);
}
};
@ -678,20 +672,6 @@ void addFoundRow(const typename Map::mapped_type & mapped, AddedColumns & added,
}
};
template <typename Map>
bool addFoundRowAsof(const typename Map::mapped_type & mapped, AddedColumns & added, IColumn::Offset & current_offset [[maybe_unused]], Join::ASOFTimeType asof_key)
{
if (auto v = mapped.findAsof(asof_key))
{
std::pair<Join::ASOFTimeType, Join::RowRef> res = *v;
// std::cout << "Adder::addFound" << " to_add" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << " found=" << res.first << std::endl;
added.appendFromBlock(*res.second.block, res.second.row_num);
return true;
}
// std::cout << "Adder::addFound" << " not found in map" << num_columns_to_add << " i=" << i << " asof_key=" << asof_key << std::endl;
return false;
}
template <bool _add_missing>
void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & current_offset [[maybe_unused]])
{
@ -707,7 +687,7 @@ void addNotFoundRow(AddedColumns & added [[maybe_unused]], IColumn::Offset & cur
/// Makes filter (1 if row presented in right table) and returns offsets to replicate (for ALL JOINS).
template <bool _add_missing, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map, bool _has_null_map>
std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
const Join & join, const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map, IColumn::Filter & filter)
{
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
@ -740,14 +720,11 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
{
Join::AsofGetterType asof_getter(asof_column);
auto asof_key = asof_getter.getKey(i, pool);
bool actually_found = addFoundRowAsof<Map>(mapped, added_columns, current_offset, asof_key);
if (actually_found)
if (const RowRef * found = mapped.findAsof(join.getAsofType(), join.getAsofData(), asof_column, i))
{
filter[i] = 1;
mapped.setUsed();
added_columns.appendFromBlock(*found->block, found->row_num);
}
else
addNotFoundRow<_add_missing>(added_columns, current_offset);
@ -772,7 +749,7 @@ std::unique_ptr<IColumn::Offsets> NO_INLINE joinRightIndexedColumns(
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename KeyGetter, typename Map>
IColumn::Filter joinRightColumns(
const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
const Join & join, const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map, std::unique_ptr<IColumn::Offsets> & offsets_to_replicate)
{
constexpr bool left_or_full = static_in_v<KIND, ASTTableJoin::Kind::Left, ASTTableJoin::Kind::Full>;
@ -781,17 +758,17 @@ IColumn::Filter joinRightColumns(
if (null_map)
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, true>(
map, rows, key_columns, key_sizes, added_columns, null_map, filter);
join, map, rows, key_columns, key_sizes, added_columns, null_map, filter);
else
offsets_to_replicate = joinRightIndexedColumns<left_or_full, STRICTNESS, KeyGetter, Map, false>(
map, rows, key_columns, key_sizes, added_columns, null_map, filter);
join, map, rows, key_columns, key_sizes, added_columns, null_map, filter);
return filter;
}
template <ASTTableJoin::Kind KIND, ASTTableJoin::Strictness STRICTNESS, typename Maps>
IColumn::Filter switchJoinRightColumns(
Join::Type type,
Join::Type type, const Join & join,
const Maps & maps_, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes,
AddedColumns & added_columns, ConstNullMapPtr null_map,
std::unique_ptr<IColumn::Offsets> & offsets_to_replicate)
@ -801,7 +778,7 @@ IColumn::Filter switchJoinRightColumns(
#define M(TYPE) \
case Join::Type::TYPE: \
return joinRightColumns<KIND, STRICTNESS, typename KeyGetterForType<Join::Type::TYPE, const std::remove_reference_t<decltype(*maps_.TYPE)>>::Type>(\
*maps_.TYPE, rows, key_columns, key_sizes, added_columns, null_map, offsets_to_replicate);
join, *maps_.TYPE, rows, key_columns, key_sizes, added_columns, null_map, offsets_to_replicate);
APPLY_FOR_JOIN_VARIANTS(M)
#undef M
@ -865,25 +842,22 @@ void Join::joinBlockImpl(
/** For LEFT/INNER JOIN, the saved blocks do not contain keys.
* For FULL/RIGHT JOIN, the saved blocks contain keys;
* but they will not be used at this stage of joining (and will be in `AdderNonJoined`), and they need to be skipped.
* For ASOF, the last column is used as the ASOF column
*/
size_t num_columns_to_skip = 0;
if constexpr (right_or_full)
num_columns_to_skip = keys_size;
/// Add new columns to the block.
AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, num_columns_to_skip);
ColumnsWithTypeAndName extras;
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Asof)
extras.push_back(sample_block_with_keys.getByName(key_names_right.back()));
AddedColumns added(sample_block_with_columns_to_add, block_with_columns_to_add, block, blocklist_sample, extras);
std::unique_ptr<IColumn::Offsets> offsets_to_replicate;
IColumn::Filter row_filter = switchJoinRightColumns<KIND, STRICTNESS>(
type, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate);
type, *this, maps_, block.rows(), key_columns, key_sizes, added, null_map, offsets_to_replicate);
for (size_t i = 0; i < added.size(); ++i)
block.insert(added.moveColumn(i));
/// Filter & insert missing rows
auto right_keys = requiredRightKeys(key_names_right, columns_added_by_join);
if constexpr (STRICTNESS == ASTTableJoin::Strictness::Any || STRICTNESS == ASTTableJoin::Strictness::Asof)

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/RowRefs.h>
#include <Core/SettingsCommon.h>
#include <Common/Arena.h>
@ -130,42 +131,9 @@ public:
size_t getTotalByteCount() const;
ASTTableJoin::Kind getKind() const { return kind; }
/// Reference to the row in block.
struct RowRef
{
const Block * block = nullptr;
size_t row_num = 0;
RowRef() {}
RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {}
};
/// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs)
struct RowRefList : RowRef
{
RowRefList * next = nullptr;
RowRefList() {}
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
};
/// Map for a time series
using ASOFTimeType = UInt32;
using AsofGetterType = ColumnsHashing::HashMethodOneNumber<ASOFTimeType, ASOFTimeType, ASOFTimeType, false>;
struct TSRowRef
{
// TODO use the arena allocator to get memory for this
// This would require ditching std::map because std::allocator is incompatible with the arena allocator
std::map<ASOFTimeType, RowRef> ts;
TSRowRef() {}
void insert(ASOFTimeType t, const Block * block, size_t row_num);
std::optional<std::pair<ASOFTimeType, RowRef>> findAsof(ASOFTimeType t) const;
std::string dumpStructure() const;
size_t size() const;
};
AsofRowRefs::Type getAsofType() const { return *asof_type; }
AsofRowRefs::LookupLists & getAsofData() { return asof_lookup_lists; }
const AsofRowRefs::LookupLists & getAsofData() const { return asof_lookup_lists; }
/** Depending on template parameter, adds or doesn't add a flag, that element was used (row was joined).
* Depending on template parameter, decide whether to overwrite existing values when encountering the same key again
@ -297,7 +265,7 @@ public:
using MapsAnyFull = MapsTemplate<WithFlags<true, false, RowRef>>;
using MapsAnyFullOverwrite = MapsTemplate<WithFlags<true, true, RowRef>>;
using MapsAllFull = MapsTemplate<WithFlags<true, false, RowRefList>>;
using MapsAsof = MapsTemplate<WithFlags<false, false, TSRowRef>>;
using MapsAsof = MapsTemplate<WithFlags<false, false, AsofRowRefs>>;
template <ASTTableJoin::Kind KIND>
struct KindTrait
@ -400,6 +368,8 @@ private:
private:
Type type = Type::EMPTY;
std::optional<AsofRowRefs::Type> asof_type;
AsofRowRefs::LookupLists asof_lookup_lists;
static Type chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_sizes);
@ -410,6 +380,9 @@ private:
/// Block with key columns in the same order they appear in the right-side table.
Block sample_block_with_keys;
/// Block as it would appear in the BlockList
Block blocklist_sample;
Poco::Logger * log;
/// Limits for maximum map size.
@ -426,6 +399,11 @@ private:
void init(Type type_);
/** Take an inserted block and discard everything that does not need to be stored
* Example, remove the keys as they come from the LHS block, but do keep the ASOF timestamps
*/
void prepareBlockListStructure(Block & stored_block);
/// Throw an exception if blocks have different types of key columns.
void checkTypesOfKeys(const Block & block_left, const Names & key_names_left, const Block & block_right) const;

View File

@ -1,8 +1,10 @@
#include <Common/typeid_cast.h>
#include <Core/NamesAndTypes.h>
#include <Interpreters/JoinToSubqueryTransformVisitor.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/AsteriskSemantic.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/Context.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTIdentifier.h>
@ -22,11 +24,122 @@ namespace ErrorCodes
extern const int TOO_DEEP_AST;
extern const int AMBIGUOUS_COLUMN_NAME;
extern const int NOT_IMPLEMENTED;
extern const int UNKNOWN_IDENTIFIER;
}
NamesAndTypesList getNamesAndTypeListFromTableExpression(const ASTTableExpression & table_expression, const Context & context);
namespace
{
/// Replace asterisks in select_expression_list with column identifiers
class ExtractAsterisksMatcher
{
public:
using Visitor = InDepthNodeVisitor<ExtractAsterisksMatcher, true>;
struct Data
{
std::unordered_map<String, NamesAndTypesList> table_columns;
std::vector<String> tables_order;
std::shared_ptr<ASTExpressionList> new_select_expression_list;
Data(const Context & context, const std::vector<const ASTTableExpression *> & table_expressions)
{
tables_order.reserve(table_expressions.size());
for (const auto & expr : table_expressions)
{
if (expr->subquery)
{
table_columns.clear();
tables_order.clear();
break;
}
String table_name = DatabaseAndTableWithAlias(*expr, context.getCurrentDatabase()).getQualifiedNamePrefix(false);
NamesAndTypesList columns = getNamesAndTypeListFromTableExpression(*expr, context);
tables_order.push_back(table_name);
table_columns.emplace(std::move(table_name), std::move(columns));
}
}
void addTableColumns(const String & table_name)
{
auto it = table_columns.find(table_name);
if (it == table_columns.end())
throw Exception("Unknown qualified identifier: " + table_name, ErrorCodes::UNKNOWN_IDENTIFIER);
for (const auto & column : it->second)
new_select_expression_list->children.push_back(
std::make_shared<ASTIdentifier>(std::vector<String>{it->first, column.name}));
}
};
static bool needChildVisit(ASTPtr &, const ASTPtr &) { return false; }
static void visit(ASTPtr & ast, Data & data)
{
if (auto * t = ast->as<ASTSelectQuery>())
visit(*t, ast, data);
if (auto * t = ast->as<ASTExpressionList>())
visit(*t, ast, data);
}
private:
static void visit(ASTSelectQuery & node, ASTPtr &, Data & data)
{
if (data.table_columns.empty())
return;
Visitor(data).visit(node.select_expression_list);
if (!data.new_select_expression_list)
return;
size_t pos = 0;
for (; pos < node.children.size(); ++pos)
if (node.children[pos].get() == node.select_expression_list.get())
break;
if (pos == node.children.size())
throw Exception("No select expressions list in select", ErrorCodes::NOT_IMPLEMENTED);
node.select_expression_list = data.new_select_expression_list;
node.children[pos] = node.select_expression_list;
}
static void visit(ASTExpressionList & node, ASTPtr &, Data & data)
{
bool has_asterisks = false;
data.new_select_expression_list = std::make_shared<ASTExpressionList>();
data.new_select_expression_list->children.reserve(node.children.size());
for (auto & child : node.children)
{
if (child->as<ASTAsterisk>())
{
has_asterisks = true;
for (auto & table_name : data.tables_order)
data.addTableColumns(table_name);
}
else if (child->as<ASTQualifiedAsterisk>())
{
has_asterisks = true;
if (child->children.size() != 1)
throw Exception("Logical error: qualified asterisk must have exactly one child", ErrorCodes::LOGICAL_ERROR);
ASTIdentifier & identifier = child->children[0]->as<ASTIdentifier &>();
data.addTableColumns(identifier.name);
}
else
data.new_select_expression_list->children.push_back(child);
}
if (!has_asterisks)
data.new_select_expression_list.reset();
}
};
/// Find columns with aliases to push them into rewritten subselects.
/// Normalize table aliases: table_name.column_name -> table_alias.column_name
/// Make aliases maps (alias -> column_name, column_name -> alias)
@ -41,7 +154,7 @@ struct ColumnAliasesMatcher
std::vector<std::pair<ASTIdentifier *, bool>> compound_identifiers;
std::set<String> allowed_long_names; /// original names allowed as aliases '--t.x as t.x' (select expressions only).
Data(std::vector<DatabaseAndTableWithAlias> && tables_)
Data(const std::vector<DatabaseAndTableWithAlias> && tables_)
: tables(tables_)
, public_names(false)
{}
@ -101,7 +214,7 @@ struct ColumnAliasesMatcher
visit(*t, ast, data);
if (ast->as<ASTAsterisk>() || ast->as<ASTQualifiedAsterisk>())
throw Exception("Multiple JOIN do not support asterisks yet", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Multiple JOIN do not support asterisks for complex queries yet", ErrorCodes::NOT_IMPLEMENTED);
}
static void visit(ASTIdentifier & node, ASTPtr &, Data & data)
@ -190,7 +303,7 @@ struct RewriteTablesVisitorData
}
};
bool needRewrite(ASTSelectQuery & select)
bool needRewrite(ASTSelectQuery & select, std::vector<const ASTTableExpression *> & table_expressions)
{
if (!select.tables)
return false;
@ -203,9 +316,16 @@ bool needRewrite(ASTSelectQuery & select)
if (num_tables <= 2)
return false;
for (size_t i = 1; i < tables->children.size(); ++i)
table_expressions.reserve(num_tables);
for (size_t i = 0; i < num_tables; ++i)
{
const auto * table = tables->children[i]->as<ASTTablesInSelectQueryElement>();
if (table && table->table_expression)
if (const auto * expression = table->table_expression->as<ASTTableExpression>())
table_expressions.push_back(expression);
if (!i)
continue;
if (!table || !table->table_join)
throw Exception("Multiple JOIN expects joined tables", ErrorCodes::LOGICAL_ERROR);
@ -223,6 +343,7 @@ bool needRewrite(ASTSelectQuery & select)
using RewriteMatcher = OneTypeMatcher<RewriteTablesVisitorData>;
using RewriteVisitor = InDepthNodeVisitor<RewriteMatcher, true>;
using ExtractAsterisksVisitor = ExtractAsterisksMatcher::Visitor;
using ColumnAliasesVisitor = InDepthNodeVisitor<ColumnAliasesMatcher, true>;
using AppendSemanticMatcher = OneTypeMatcher<AppendSemanticVisitorData>;
using AppendSemanticVisitor = InDepthNodeVisitor<AppendSemanticMatcher, true>;
@ -236,13 +357,17 @@ void JoinToSubqueryTransformMatcher::visit(ASTPtr & ast, Data & data)
visit(*t, ast, data);
}
void JoinToSubqueryTransformMatcher::visit(ASTSelectQuery & select, ASTPtr &, Data & data)
void JoinToSubqueryTransformMatcher::visit(ASTSelectQuery & select, ASTPtr & ast, Data & data)
{
using RevertedAliases = AsteriskSemantic::RevertedAliases;
if (!needRewrite(select))
std::vector<const ASTTableExpression *> table_expressions;
if (!needRewrite(select, table_expressions))
return;
ExtractAsterisksVisitor::Data asterisks_data(data.context, table_expressions);
ExtractAsterisksVisitor(asterisks_data).visit(ast);
ColumnAliasesVisitor::Data aliases_data(getDatabaseAndTables(select, ""));
if (select.select_expression_list)
{

View File

@ -6,6 +6,7 @@ namespace DB
{
class ASTSelectQuery;
class Context;
/// AST transformer. It replaces multiple joins to (subselect + join) track.
/// 'select * from t1 join t2 on ... join t3 on ... join t4 on ...' would be rewriten with
@ -15,6 +16,7 @@ class JoinToSubqueryTransformMatcher
public:
struct Data
{
const Context & context;
bool done = false;
};

View File

@ -340,8 +340,9 @@ ASTs PredicateExpressionsOptimizer::getSelectQueryProjectionColumns(ASTPtr & ast
std::vector<DatabaseAndTableWithAlias> tables = getDatabaseAndTables(*select_query, context.getCurrentDatabase());
/// TODO: get tables from evaluateAsterisk instead of tablesOnly() to extract asterisks in general way
NameSet source_columns;
std::vector<TableWithColumnNames> tables_with_columns = TranslateQualifiedNamesVisitor::Data::tablesOnly(tables);
TranslateQualifiedNamesVisitor::Data qn_visitor_data({}, tables_with_columns, false);
TranslateQualifiedNamesVisitor::Data qn_visitor_data(source_columns, tables_with_columns, false);
TranslateQualifiedNamesVisitor(qn_visitor_data).visit(ast);
QueryAliasesVisitor::Data query_aliases_data{aliases};

View File

@ -0,0 +1,112 @@
#include <Interpreters/RowRefs.h>
#include <Common/typeid_cast.h>
#include <Common/ColumnsHashing.h>
#include <Core/Block.h>
#include <Columns/IColumn.h>
namespace DB
{
namespace
{
/// maps enum values to types
template <typename F>
void callWithType(AsofRowRefs::Type which, F && f)
{
switch (which)
{
case AsofRowRefs::Type::key32: return f(UInt32());
case AsofRowRefs::Type::key64: return f(UInt64());
case AsofRowRefs::Type::keyf32: return f(Float32());
case AsofRowRefs::Type::keyf64: return f(Float64());
}
__builtin_unreachable();
}
} // namespace
void AsofRowRefs::insert(Type type, LookupLists & lookup_data, const IColumn * asof_column, const Block * block, size_t row_num)
{
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupType = typename Entry<T>::LookupType;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
auto entry = Entry<T>(key, RowRef(block, row_num));
std::lock_guard<std::mutex> lock(lookup_data.mutex);
if (!lookups)
{
lookup_data.lookups.push_back(Lookups());
lookup_data.lookups.back() = LookupType();
lookups = &lookup_data.lookups.back();
}
std::get<LookupType>(*lookups).insert(entry);
};
callWithType(type, call);
}
const RowRef * AsofRowRefs::findAsof(Type type, const LookupLists & lookup_data, const IColumn * asof_column, size_t row_num) const
{
const RowRef * out = nullptr;
auto call = [&](const auto & t)
{
using T = std::decay_t<decltype(t)>;
using LookupType = typename Entry<T>::LookupType;
auto * column = typeid_cast<const ColumnVector<T> *>(asof_column);
T key = column->getElement(row_num);
std::lock_guard<std::mutex> lock(lookup_data.mutex);
if (!lookups)
return;
auto & typed_lookup = std::get<LookupType>(*lookups);
auto it = typed_lookup.upper_bound(Entry<T>(key));
if (it != typed_lookup.cbegin())
out = &((--it)->row_ref);
};
callWithType(type, call);
return out;
}
std::optional<AsofRowRefs::Type> AsofRowRefs::getTypeSize(const IColumn * asof_column, size_t & size)
{
if (typeid_cast<const ColumnVector<UInt32> *>(asof_column))
{
size = sizeof(UInt32);
return Type::key32;
}
else if (typeid_cast<const ColumnVector<UInt64> *>(asof_column))
{
size = sizeof(UInt64);
return Type::key64;
}
else if (typeid_cast<const ColumnVector<Float32> *>(asof_column))
{
size = sizeof(Float32);
return Type::keyf32;
}
else if (typeid_cast<const ColumnVector<Float64> *>(asof_column))
{
size = sizeof(Float64);
return Type::keyf64;
}
size = 0;
return {};
}
}

View File

@ -0,0 +1,84 @@
#pragma once
#include <Columns/IColumn.h>
#include <Common/SortedLookupPODArray.h>
#include <optional>
#include <variant>
#include <list>
#include <mutex>
namespace DB
{
class Block;
/// Reference to the row in block.
struct RowRef
{
const Block * block = nullptr;
size_t row_num = 0;
RowRef() {}
RowRef(const Block * block_, size_t row_num_) : block(block_), row_num(row_num_) {}
};
/// Single linked list of references to rows. Used for ALL JOINs (non-unique JOINs)
struct RowRefList : RowRef
{
RowRefList * next = nullptr;
RowRefList() {}
RowRefList(const Block * block_, size_t row_num_) : RowRef(block_, row_num_) {}
};
class AsofRowRefs
{
public:
template <typename T>
struct Entry
{
using LookupType = SortedLookupPODArray<Entry<T>>;
T asof_value;
RowRef row_ref;
Entry(T v) : asof_value(v) {}
Entry(T v, RowRef rr) : asof_value(v), row_ref(rr) {}
bool operator < (const Entry & o) const
{
return asof_value < o.asof_value;
}
};
using Lookups = std::variant<
Entry<UInt32>::LookupType,
Entry<UInt64>::LookupType,
Entry<Float32>::LookupType,
Entry<Float64>::LookupType>;
struct LookupLists
{
mutable std::mutex mutex;
std::list<Lookups> lookups;
};
enum class Type
{
key32,
key64,
keyf32,
keyf64,
};
static std::optional<Type> getTypeSize(const IColumn * asof_column, size_t & type_size);
void insert(Type type, LookupLists &, const IColumn * asof_column, const Block * block, size_t row_num);
const RowRef * findAsof(Type type, const LookupLists &, const IColumn * asof_column, size_t row_num) const;
private:
Lookups * lookups = nullptr;
};
}

View File

@ -16,6 +16,7 @@
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTFunction.h>
#include <iostream>
namespace DB

View File

@ -59,7 +59,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
}
if (ast_col_decl.codec)
command.codec = compression_codec_factory.get(ast_col_decl.codec);
command.codec = compression_codec_factory.get(ast_col_decl.codec, command.data_type);
if (command_ast->column)
command.after_column = *getIdentifierName(command_ast->column);
@ -105,7 +105,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
}
if (ast_col_decl.codec)
command.codec = compression_codec_factory.get(ast_col_decl.codec);
command.codec = compression_codec_factory.get(ast_col_decl.codec, command.data_type);
command.if_exists = command_ast->if_exists;
@ -190,7 +190,13 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
ColumnDescription & column = columns_description.get(column_name);
if (codec)
{
/// User doesn't specify data type, it means that datatype doesn't change
/// let's use info about old type
if (data_type == nullptr)
codec->useInfoAboutType(column.type);
column.codec = codec;
}
if (!is_mutable())
{

View File

@ -22,7 +22,6 @@
#include <Common/typeid_cast.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h>
#include <Common/escapeForFileName.h>
#include <Common/CurrentThread.h>
#include <common/logger_useful.h>

View File

@ -16,7 +16,6 @@
#include <Common/ThreadPool.h>
namespace DB
{
@ -29,6 +28,8 @@ enum class BackgroundProcessingPoolTaskResult
ERROR,
NOTHING_TO_DO,
};
/** Using a fixed number of threads, perform an arbitrary number of tasks in an infinite loop.
* In this case, one task can run simultaneously from different threads.
* Designed for tasks that perform continuous background work (for example, merge).
@ -45,7 +46,6 @@ public:
using TaskHandle = std::shared_ptr<TaskInfo>;
BackgroundProcessingPool(int size_);
size_t getNumberOfThreads() const

View File

@ -1,7 +1,6 @@
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <Common/escapeForFileName.h>
#include <Common/MemoryTracker.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <Columns/ColumnArray.h>
#include <Interpreters/evaluateMissingDefaults.h>

View File

@ -1,4 +1,3 @@
#include <Common/MemoryTracker.h>
#include <Storages/MergeTree/MergeTreeReaderStream.h>
#include <Poco/File.h>

View File

@ -4,7 +4,7 @@
#include <Parsers/ASTCreateQuery.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <IO/WriteHelpers.h>
namespace DB
{
@ -120,7 +120,13 @@ StoragePtr StorageFactory::get(
auto it = storages.find(name);
if (it == storages.end())
{
auto hints = getHints(name);
if (!hints.empty())
throw Exception("Unknown table engine " + name + ". Maybe you meant: " + toString(hints), ErrorCodes::UNKNOWN_STORAGE);
else
throw Exception("Unknown table engine " + name, ErrorCodes::UNKNOWN_STORAGE);
}
Arguments arguments
{

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/NamePrompter.h>
#include <Storages/IStorage.h>
#include <ext/singleton.h>
#include <unordered_map>
@ -17,7 +18,7 @@ class ASTStorage;
* In 'columns' Nested data structures must be flattened.
* You should subsequently call IStorage::startup method to work with table.
*/
class StorageFactory : public ext::singleton<StorageFactory>
class StorageFactory : public ext::singleton<StorageFactory>, public IHints<1, StorageFactory>
{
public:
struct Arguments
@ -58,6 +59,14 @@ public:
return storages;
}
std::vector<String> getAllRegisteredNames() const override
{
std::vector<String> result;
auto getter = [](const auto & pair) { return pair.first; };
std::transform(storages.begin(), storages.end(), std::back_inserter(result), getter);
return result;
}
private:
using Storages = std::unordered_map<std::string, Creator>;
Storages storages;

View File

@ -4,6 +4,7 @@
#include <Common/Exception.h>
#include <IO/WriteHelpers.h>
namespace DB
{
@ -32,7 +33,13 @@ TableFunctionPtr TableFunctionFactory::get(
auto it = functions.find(name);
if (it == functions.end())
{
auto hints = getHints(name);
if (!hints.empty())
throw Exception("Unknown table function " + name + ". Maybe you meant: " + toString(hints), ErrorCodes::UNKNOWN_FUNCTION);
else
throw Exception("Unknown table function " + name, ErrorCodes::UNKNOWN_FUNCTION);
}
return it->second();
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <TableFunctions/ITableFunction.h>
#include <Common/NamePrompter.h>
#include <ext/singleton.h>
@ -18,7 +19,7 @@ class Context;
/** Lets you get a table function by its name.
*/
class TableFunctionFactory final: public ext::singleton<TableFunctionFactory>
class TableFunctionFactory final: public ext::singleton<TableFunctionFactory>, public IHints<1, TableFunctionFactory>
{
public:
using Creator = std::function<TableFunctionPtr()>;
@ -50,6 +51,14 @@ public:
return functions;
}
std::vector<String> getAllRegisteredNames() const override
{
std::vector<String> result;
auto getter = [](const auto & pair) { return pair.first; };
std::transform(functions.begin(), functions.end(), std::back_inserter(result), getter);
return result;
}
private:
TableFunctions functions;
};

View File

@ -341,7 +341,7 @@ def main(args):
if result_is_different:
diff = Popen(['diff', '--unified', reference_file, stdout_file], stdout = PIPE).communicate()[0]
diff = unicode(diff, errors='replace', encoding='utf-8')
cat = Popen(['cat', '-A'], stdin=PIPE, stdout=PIPE).communicate(input=diff)[0]
cat = Popen(['cat', '-vet'], stdin=PIPE, stdout=PIPE).communicate(input=diff.encode(encoding='utf-8', errors='replace'))[0]
failure = et.Element("failure", attrib = {"message": "result differs with reference"})
report_testcase.append(failure)
@ -367,12 +367,13 @@ def main(args):
print(colored("Break tests execution", "red"))
raise e
except:
(exc_type, exc_value) = sys.exc_info()[:2]
import traceback
exc_type, exc_value, tb = sys.exc_info()
error = et.Element("error", attrib = {"type": exc_type.__name__, "message": str(exc_value)})
report_testcase.append(error)
failures += 1
print("{0} - Test internal error: {1}\n{2}".format(MSG_FAIL, exc_type.__name__, exc_value))
print("{0} - Test internal error: {1}\n{2}\n{3}".format(MSG_FAIL, exc_type.__name__, exc_value, "\n".join(traceback.format_tb(tb, 10))))
finally:
dump_report(args.output, suite, name, report_testcase)

View File

@ -4,7 +4,21 @@ else()
set (TEST_USE_BINARIES CLICKHOUSE_TESTS_SERVER_BIN_PATH=${ClickHouse_BINARY_DIR}/dbms/programs/clickhouse CLICKHOUSE_TESTS_CLIENT_BIN_PATH=${ClickHouse_BINARY_DIR}/dbms/programs/clickhouse)
endif()
find_program(DOCKER_CMD docker)
find_program(DOCKER_COMPOSE_CMD docker-compose)
find_program(PYTEST_CMD pytest)
find_program(SUDO_CMD sudo)
# will mount only one binary to docker container - build with .so cant work
if (MAKE_STATIC_LIBRARIES)
add_test (NAME integration WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND env ${TEST_USE_BINARIES} "CLICKHOUSE_TESTS_BASE_CONFIG_DIR=${ClickHouse_SOURCE_DIR}/dbms/programs/server/" ${PYTEST_STARTER} pytest ${PYTEST_OPT})
if(MAKE_STATIC_LIBRARIES AND DOCKER_CMD)
if(INTEGRATION_USE_RUNNER AND SUDO_CMD)
add_test(NAME integration-runner WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND ${SUDO_CMD} ${CMAKE_CURRENT_SOURCE_DIR}/runner --binary ${ClickHouse_BINARY_DIR}/dbms/programs/clickhouse --configs-dir ${ClickHouse_SOURCE_DIR}/dbms/programs/server/)
message(STATUS "Using tests in docker with runner SUDO=${SUDO_CMD}; DOCKER=${DOCKER_CMD};")
endif()
if(NOT INTEGRATION_USE_RUNNER AND DOCKER_COMPOSE_CMD AND PYTEST_CMD)
# To run one test with debug:
# cmake . -DPYTEST_OPT="-ss;test_cluster_copier"
add_test(NAME integration-pytest WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND env ${TEST_USE_BINARIES} "CLICKHOUSE_TESTS_BASE_CONFIG_DIR=${ClickHouse_SOURCE_DIR}/dbms/programs/server/" ${PYTEST_STARTER} ${PYTEST_CMD} ${PYTEST_OPT})
message(STATUS "Using tests in docker DOCKER=${DOCKER_CMD}; DOCKER_COMPOSE=${DOCKER_COMPOSE_CMD}; PYTEST=${PYTEST_STARTER} ${PYTEST_CMD} ${PYTEST_OPT}")
endif()
endif()

View File

@ -17,6 +17,7 @@ import psycopg2
import requests
import base64
import pymongo
import urllib
import docker
from docker.errors import ContainerError
@ -496,6 +497,10 @@ class ClickHouseInstance:
def get_query_request(self, *args, **kwargs):
return self.client.get_query_request(*args, **kwargs)
# Connects to the instance via HTTP interface, sends a query and returns the answer
def http_query(self, sql, data=None):
return urllib.urlopen("http://"+self.ip_address+":8123/?query="+urllib.quote(sql,safe=''), data).read()
def restart_clickhouse(self, stop_start_wait_sec=5):
if not self.stay_alive:
raise Exception("clickhouse can be restarted only with stay_alive=True instance")

View File

@ -17,9 +17,12 @@ services:
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka1:9092"
KAFKA_ZOOKEEPER_CONNECT: "kafka_zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: INSIDE://localhost:9092,OUTSIDE://kafka1:19092
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:19092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: "kafka_zookeeper:2181"
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:

View File

@ -0,0 +1,39 @@
<yandex>
<remote_servers>
<source_cluster>
<shard>
<weight>1</weight>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
</shard>
</source_cluster>
<default_cluster>
<shard>
<weight>1</weight>
<replica>
<host>s1_1_0</host>
<port>9000</port>
</replica>
</shard>
</default_cluster>
</remote_servers>
<max_workers>1</max_workers>
<tables>
<table_crm_fin_account>
<cluster_pull>source_cluster</cluster_pull>
<database_pull>default</database_pull>
<table_pull>copier_test1</table_pull>
<cluster_push>default_cluster</cluster_push>
<database_push>default</database_push>
<table_push>copier_test1_1</table_push>
<engine>ENGINE = MergeTree PARTITION BY date ORDER BY date</engine>
<sharding_key>rand()</sharding_key>
</table_crm_fin_account>
</tables>
</yandex>

View File

@ -0,0 +1,109 @@
<yandex>
<remote_servers>
<source_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>s0_0_0</host>
<port>9000</port>
</replica>
</shard>
</source_cluster>
<destination_cluster>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>s1_1_0</host>
<port>9000</port>
</replica>
</shard>
</destination_cluster>
</remote_servers>
<!-- How many simultaneously active workers are possible. If you run more workers superfluous workers will sleep. -->
<max_workers>2</max_workers>
<!-- Setting used to fetch (pull) data from source cluster tables -->
<settings_pull>
<readonly>1</readonly>
</settings_pull>
<!-- Setting used to insert (push) data to destination cluster tables -->
<settings_push>
<readonly>0</readonly>
</settings_push>
<!-- Common setting for fetch (pull) and insert (push) operations. Also, copier process context uses it.
They are overlaid by <settings_pull/> and <settings_push/> respectively. -->
<settings>
<connect_timeout>3</connect_timeout>
<!-- Sync insert is set forcibly, leave it here just in case. -->
<insert_distributed_sync>1</insert_distributed_sync>
</settings>
<!-- Copying tasks description.
You could specify several table task in the same task description (in the same ZooKeeper node), they will be performed
sequentially.
-->
<tables>
<!-- A table task, copies one table. -->
<table_hits>
<!-- Source cluster name (from <remote_servers/> section) and tables in it that should be copied -->
<cluster_pull>source_cluster</cluster_pull>
<database_pull>default</database_pull>
<table_pull>ontime</table_pull>
<!-- <table_pull>onetime</table_pull> -->
<!-- Destination cluster name and tables in which the data should be inserted -->
<cluster_push>destination_cluster</cluster_push>
<database_push>default</database_push>
<table_push>ontime22</table_push>
<!-- <table_pull>onetime</table_pull> -->
<!-- Engine of destination tables.
If destination tables have not be created, workers create them using columns definition from source tables and engine
definition from here.
NOTE: If the first worker starts insert data and detects that destination partition is not empty then the partition will
be dropped and refilled, take it into account if you already have some data in destination tables. You could directly
specify partitions that should be copied in <enabled_partitions/>, they should be in quoted format like partition column of
system.parts table.
-->
<engine>
ENGINE = MergeTree() PARTITION BY Year ORDER BY (Year, FlightDate) SETTINGS index_granularity=8192
</engine>
<!-- Sharding key used to insert data to destination cluster -->
<sharding_key>jumpConsistentHash(intHash64(Year), 2)</sharding_key>
<!-- Optional expression that filter data while pull them from source servers -->
<!-- <where_condition>CounterID != 0</where_condition> -->
<!-- This section specifies partitions that should be copied, other partition will be ignored.
Partition names should have the same format as
partition column of system.parts table (i.e. a quoted text).
Since partition key of source and destination cluster could be different,
these partition names specify destination partitions.
NOTE: In spite of this section is optional (if it is not specified, all partitions will be copied),
it is strictly recommended to specify them explicitly.
If you already have some ready paritions on destination cluster they
will be removed at the start of the copying since they will be interpeted
as unfinished data from the previous copying!!!
-->
<enabled_partitions>
<partition>2017</partition>
</enabled_partitions>
</table_hits>
<!-- Next table to copy. It is not copied until previous table is copying. -->
<!-- </table_visits>
</table_visits>
-->
</tables>
</yandex>

View File

@ -168,6 +168,52 @@ class Task_test_block_size:
ddl_check_query(instance, "DROP TABLE test_block_size ON CLUSTER cluster1")
class Task_no_index:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path="/clickhouse-copier/task_no_index"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_no_index.xml'), 'r').read()
self.rows = 1000000
def start(self):
instance = cluster.instances['s0_0_0']
instance.query("create table ontime (Year UInt16, FlightDate String) ENGINE = Memory")
instance.query("insert into ontime values (2016, 'test6'), (2017, 'test7'), (2018, 'test8')")
def check(self):
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT Year FROM ontime22")) == TSV("2017\n")
instance = cluster.instances['s0_0_0']
instance.query("DROP TABLE ontime")
instance = cluster.instances['s1_1_0']
instance.query("DROP TABLE ontime22")
class Task_no_arg:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path="/clickhouse-copier/task_no_arg"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_no_arg.xml'), 'r').read()
self.rows = 1000000
def start(self):
instance = cluster.instances['s0_0_0']
instance.query("create table copier_test1 (date Date, id UInt32) engine = MergeTree PARTITION BY date ORDER BY date SETTINGS index_granularity = 8192")
instance.query("insert into copier_test1 values ('2016-01-01', 10);")
def check(self):
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT date FROM copier_test1_1")) == TSV("2016-01-01\n")
instance = cluster.instances['s0_0_0']
instance.query("DROP TABLE copier_test1")
instance = cluster.instances['s1_1_0']
instance.query("DROP TABLE copier_test1_1")
def execute_task(task, cmd_options):
task.start()
@ -229,6 +275,11 @@ def test_copy_month_to_week_partition_with_recovering(started_cluster):
def test_block_size(started_cluster):
execute_task(Task_test_block_size(started_cluster), [])
def test_no_index(started_cluster):
execute_task(Task_no_index(started_cluster), [])
def test_no_arg(started_cluster):
execute_task(Task_no_arg(started_cluster), [])
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:

View File

@ -0,0 +1,6 @@
syntax = "proto3";
message KeyValuePair {
uint64 key = 1;
string value = 2;
}

View File

@ -0,0 +1,40 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
clickhouse_path_dir='clickhouse_path')
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
instance.query('CREATE DATABASE test')
yield cluster
finally:
cluster.shutdown()
def create_simple_table():
instance.query("DROP TABLE IF EXISTS test.simple")
instance.query('''
CREATE TABLE test.simple (key UInt64, value String)
ENGINE = MergeTree ORDER BY tuple();
''')
def test_protobuf_format_input(started_cluster):
create_simple_table()
instance.http_query(
"INSERT INTO test.simple FORMAT Protobuf SETTINGS format_schema='simple:KeyValuePair'",
"\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def")
assert instance.query("SELECT * from test.simple") == "1\tabc\n2\tdef\n"
def test_protobuf_format_output(started_cluster):
create_simple_table()
instance.query("INSERT INTO test.simple VALUES (1, 'abc'), (2, 'def')");
assert instance.http_query("SELECT * FROM test.simple FORMAT Protobuf SETTINGS format_schema='simple:KeyValuePair'") == \
"\x07\x08\x01\x12\x03abc\x07\x08\x02\x12\x03def"

View File

@ -0,0 +1,6 @@
syntax = "proto3";
message KeyValuePair {
uint64 key = 1;
string value = 2;
}

View File

@ -0,0 +1,76 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: clickhouse_path/format_schemas/kafka.proto
import sys
_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1'))
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import symbol_database as _symbol_database
from google.protobuf import descriptor_pb2
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name='clickhouse_path/format_schemas/kafka.proto',
package='',
syntax='proto3',
serialized_pb=_b('\n*clickhouse_path/format_schemas/kafka.proto\"*\n\x0cKeyValuePair\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3')
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_KEYVALUEPAIR = _descriptor.Descriptor(
name='KeyValuePair',
full_name='KeyValuePair',
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name='key', full_name='KeyValuePair.key', index=0,
number=1, type=4, cpp_type=4, label=1,
has_default_value=False, default_value=0,
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
_descriptor.FieldDescriptor(
name='value', full_name='KeyValuePair.value', index=1,
number=2, type=9, cpp_type=9, label=1,
has_default_value=False, default_value=_b("").decode('utf-8'),
message_type=None, enum_type=None, containing_type=None,
is_extension=False, extension_scope=None,
options=None),
],
extensions=[
],
nested_types=[],
enum_types=[
],
options=None,
is_extendable=False,
syntax='proto3',
extension_ranges=[],
oneofs=[
],
serialized_start=46,
serialized_end=88,
)
DESCRIPTOR.message_types_by_name['KeyValuePair'] = _KEYVALUEPAIR
KeyValuePair = _reflection.GeneratedProtocolMessageType('KeyValuePair', (_message.Message,), dict(
DESCRIPTOR = _KEYVALUEPAIR,
__module__ = 'clickhouse_path.format_schemas.kafka_pb2'
# @@protoc_insertion_point(class_scope:KeyValuePair)
))
_sym_db.RegisterMessage(KeyValuePair)
# @@protoc_insertion_point(module_scope)

View File

@ -7,6 +7,17 @@ from helpers.test_tools import TSV
import json
import subprocess
from kafka import KafkaProducer
from google.protobuf.internal.encoder import _VarintBytes
"""
protoc --version
libprotoc 3.0.0
# to create kafka_pb2.py
protoc --python_out=. kafka.proto
"""
import kafka_pb2
# TODO: add test for run-time offset update in CH, if we manually update it on Kafka side.
@ -17,7 +28,8 @@ import subprocess
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance',
main_configs=['configs/kafka.xml'],
with_kafka=True)
with_kafka=True,
clickhouse_path_dir='clickhouse_path')
kafka_id = ''
@ -30,7 +42,7 @@ def check_kafka_is_available():
kafka_id,
'/usr/bin/kafka-broker-api-versions',
'--bootstrap-server',
'PLAINTEXT://localhost:9092'),
'INSIDE://localhost:9092'),
stdout=subprocess.PIPE)
p.communicate()
return p.returncode == 0
@ -56,7 +68,7 @@ def kafka_produce(topic, messages):
kafka_id,
'/usr/bin/kafka-console-producer',
'--broker-list',
'localhost:9092',
'INSIDE://localhost:9092',
'--topic',
topic,
'--sync',
@ -65,7 +77,21 @@ def kafka_produce(topic, messages):
stdin=subprocess.PIPE)
p.communicate(messages)
p.stdin.close()
print("Produced {} messages".format(len(messages.splitlines())))
print("Produced {} messages for topic {}".format(len(messages.splitlines()), topic))
def kafka_produce_protobuf_messages(topic, start_index, num_messages):
data = ''
for i in range(start_index, start_index + num_messages):
msg = kafka_pb2.KeyValuePair()
msg.key = i
msg.value = str(i)
serialized_msg = msg.SerializeToString()
data = data + _VarintBytes(len(serialized_msg)) + serialized_msg
producer = KafkaProducer(bootstrap_servers="localhost:9092")
producer.send(topic=topic, value=data)
producer.flush()
print("Produced {} messages for topic {}".format(num_messages, topic))
# Since everything is async and shaky when receiving messages from Kafka,
@ -110,7 +136,7 @@ def kafka_setup_teardown():
def test_kafka_settings_old_syntax(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka('kafka1:9092', 'old', 'old', 'JSONEachRow', '\\n');
ENGINE = Kafka('kafka1:19092', 'old', 'old', 'JSONEachRow', '\\n');
''')
# Don't insert malformed messages since old settings syntax
@ -133,7 +159,7 @@ def test_kafka_settings_new_syntax(kafka_cluster):
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092',
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'new',
kafka_group_name = 'new',
kafka_format = 'JSONEachRow',
@ -168,7 +194,7 @@ def test_kafka_csv_with_delimiter(kafka_cluster):
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092',
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'csv',
kafka_group_name = 'csv',
kafka_format = 'CSV',
@ -193,7 +219,7 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092',
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'tsv',
kafka_group_name = 'tsv',
kafka_format = 'TSV',
@ -213,6 +239,30 @@ def test_kafka_tsv_with_delimiter(kafka_cluster):
kafka_check_result(result, True)
def test_kafka_protobuf(kafka_cluster):
instance.query('''
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'pb',
kafka_group_name = 'pb',
kafka_format = 'Protobuf',
kafka_schema = 'kafka.proto:KeyValuePair';
''')
kafka_produce_protobuf_messages('pb', 0, 20)
kafka_produce_protobuf_messages('pb', 20, 1)
kafka_produce_protobuf_messages('pb', 21, 29)
result = ''
for i in range(50):
result += instance.query('SELECT * FROM test.kafka')
if kafka_check_result(result):
break
kafka_check_result(result, True)
def test_kafka_materialized_view(kafka_cluster):
instance.query('''
DROP TABLE IF EXISTS test.view;
@ -220,7 +270,7 @@ def test_kafka_materialized_view(kafka_cluster):
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092',
kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'json',
kafka_group_name = 'json',
kafka_format = 'JSONEachRow',

View File

@ -3,6 +3,6 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --host=localhost --query="SELECT 1";
$CLICKHOUSE_CLIENT --host localhost --query "SELECT 1";
$CLICKHOUSE_CLIENT -hlocalhost -q"SELECT 1";
clickhouse_client_removed_host_parameter --host="${CLICKHOUSE_HOST}" --query="SELECT 1";
clickhouse_client_removed_host_parameter --host "${CLICKHOUSE_HOST}" --query "SELECT 1";
clickhouse_client_removed_host_parameter -h"${CLICKHOUSE_HOST}" -q"SELECT 1";

View File

@ -3,4 +3,4 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
$CLICKHOUSE_CLIENT --host=localhost --query="SELECT * FROM ext" --format=Vertical --external --file=- --structure="s String" --name=ext --format=JSONEachRow <<< '{"s":"Hello"}'
clickhouse_client_removed_host_parameter --host="${CLICKHOUSE_HOST}" --query="SELECT * FROM ext" --format=Vertical --external --file=- --structure="s String" --name=ext --format=JSONEachRow <<< '{"s":"Hello"}'

View File

@ -52,6 +52,7 @@ query=hello world+foo+bar
query=hello world+foo+bar#a=b
query=hello world+foo+bar#a=b
query=hello world+foo+bar#a=b
#a=b
====CUT TO FIRST SIGNIFICANT SUBDOMAIN====
example.com
example.com
@ -92,3 +93,4 @@ http://www.example.com/a/b/c
http://www.example.com/a/b/c
http://paul@www.example.com/a/b/c
//paul@www.example.com/a/b/c
//paul@www.example.com/a/b/c

View File

@ -59,6 +59,7 @@ SELECT decodeURLComponent(queryStringAndFragment('http://127.0.0.1/?query=hello%
SELECT decodeURLComponent(queryStringAndFragment('http://127.0.0.1/?query=hello%20world+foo%2Bbar#a=b'));
SELECT decodeURLComponent(queryStringAndFragment('http://paul@127.0.0.1/?query=hello%20world+foo%2Bbar#a=b'));
SELECT decodeURLComponent(queryStringAndFragment('//paul@127.0.0.1/?query=hello%20world+foo%2Bbar#a=b'));
SELECT decodeURLComponent(queryStringAndFragment('//paul@127.0.0.1/#a=b'));
SELECT '====CUT TO FIRST SIGNIFICANT SUBDOMAIN====';
SELECT cutToFirstSignificantSubdomain('http://www.example.com');
@ -104,4 +105,5 @@ SELECT cutQueryStringAndFragment('http://www.example.com/a/b/c?a=b');
SELECT cutQueryStringAndFragment('http://www.example.com/a/b/c?a=b#d=f');
SELECT cutQueryStringAndFragment('http://paul@www.example.com/a/b/c?a=b#d=f');
SELECT cutQueryStringAndFragment('//paul@www.example.com/a/b/c?a=b#d=f');
SELECT cutQueryStringAndFragment('//paul@www.example.com/a/b/c#d=f');

View File

@ -53,3 +53,5 @@ SELECT \n date, \n id, \n name, \n value, \n b.date, \n b.name
2000-01-01 1 test string 1 1 2000-01-01 test string 1 1
SELECT \n id, \n date, \n name, \n value\nFROM \n(\n SELECT \n toInt8(1) AS id, \n toDate(\'2000-01-01\') AS date\n FROM system.numbers \n LIMIT 1\n) \nANY LEFT JOIN \n(\n SELECT *\n FROM test.test \n WHERE date = toDate(\'2000-01-01\')\n) AS b USING (date, id)\nWHERE b.date = toDate(\'2000-01-01\')
1 2000-01-01 test string 1 1
SELECT \n date, \n id, \n name, \n value, \n `b.date`, \n `b.id`, \n `b.name`, \n `b.value`\nFROM \n(\n SELECT \n date, \n id, \n name, \n value, \n b.date, \n b.id, \n b.name, \n b.value\n FROM \n (\n SELECT \n date, \n id, \n name, \n value\n FROM test.test \n WHERE id = 1\n ) AS a \n ANY LEFT JOIN \n (\n SELECT *\n FROM test.test \n ) AS b ON id = b.id\n WHERE id = 1\n) \nWHERE id = 1
2000-01-01 1 test string 1 1 2000-01-01 1 test string 1 1

View File

@ -108,5 +108,8 @@ SELECT * FROM (SELECT * FROM test.test) ANY LEFT JOIN (SELECT * FROM test.test)
ANALYZE SELECT * FROM (SELECT toInt8(1) AS id, toDate('2000-01-01') AS date FROM system.numbers LIMIT 1) ANY LEFT JOIN (SELECT * FROM test.test) AS b USING date, id WHERE b.date = toDate('2000-01-01');
SELECT * FROM (SELECT toInt8(1) AS id, toDate('2000-01-01') AS date FROM system.numbers LIMIT 1) ANY LEFT JOIN (SELECT * FROM test.test) AS b USING date, id WHERE b.date = toDate('2000-01-01');
ANALYZE SELECT * FROM (SELECT * FROM (SELECT * FROM test.test) AS a ANY LEFT JOIN (SELECT * FROM test.test) AS b ON a.id = b.id) WHERE id = 1;
SELECT * FROM (SELECT * FROM (SELECT * FROM test.test) AS a ANY LEFT JOIN (SELECT * FROM test.test) AS b ON a.id = b.id) WHERE id = 1;
DROP TABLE IF EXISTS test.test;
DROP TABLE IF EXISTS test.test_view;

View File

@ -9,6 +9,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
cur_name=$(basename "${BASH_SOURCE[0]}")
server_logs_file="${CLICKHOUSE_TMP}/${cur_name}_server.logs"
server_logs="--server_logs_file=$server_logs_file"
rm -f "$server_logs_file"

View File

@ -9,6 +9,7 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
cur_name=$(basename "${BASH_SOURCE[0]}")
server_logs_file=${CLICKHOUSE_TMP}/$cur_name"_server.logs"
server_logs="--server_logs_file=$server_logs_file"
rm -f "$server_logs_file"

View File

@ -5,17 +5,31 @@ import sys
import tempfile
import threading
import os, urllib
import subprocess
from io import StringIO
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
CLICKHOUSE_HOST = os.environ.get('CLICKHOUSE_HOST', '127.0.0.1')
CLICKHOUSE_PORT_HTTP = os.environ.get('CLICKHOUSE_PORT_HTTP', '8123')
SERVER_ADDRESS = ('127.0.0.1', 51234)
SERVER_ADDRESS_STR = 'http://' + ':'.join(str(s) for s in SERVER_ADDRESS) + "/"
#####################################################################################
# This test starts an HTTP server and serves data to clickhouse url-engine based table.
# In order for it to work ip+port of http server (given below) should be
# accessible from clickhouse server.
#####################################################################################
# IP-address of this host accessible from outside world.
HTTP_SERVER_HOST = subprocess.check_output(['hostname', '-i']).decode('utf-8').strip()
HTTP_SERVER_PORT = int(os.environ.get('CLICKHOUSE_TEST_HOST_EXPOSED_PORT', 51234))
# IP address and port of the HTTP server started from this script.
HTTP_SERVER_ADDRESS = (HTTP_SERVER_HOST, HTTP_SERVER_PORT)
HTTP_SERVER_URL_STR = 'http://' + ':'.join(str(s) for s in HTTP_SERVER_ADDRESS) + "/"
CSV_DATA = os.path.join(tempfile._get_default_tempdir(), next(tempfile._get_candidate_names()))
def get_ch_answer(query):
return urllib.urlopen(os.environ.get('CLICKHOUSE_URL', 'http://localhost:' + os.environ.get('CLICKHOUSE_PORT_HTTP', '8123')), data=query).read()
url = os.environ.get('CLICKHOUSE_URL', 'http://{host}:{port}'.format(host=CLICKHOUSE_HOST, port=CLICKHOUSE_PORT_HTTP))
return urllib.urlopen(url, data=query).read()
def check_answers(query, answer):
ch_answer = get_ch_answer(query)
@ -75,7 +89,7 @@ class CSVHTTPServer(BaseHTTPRequestHandler):
return
def start_server(requests_amount):
httpd = HTTPServer(SERVER_ADDRESS, CSVHTTPServer)
httpd = HTTPServer(HTTP_SERVER_ADDRESS, CSVHTTPServer)
def real_func():
for i in xrange(requests_amount):
@ -96,12 +110,12 @@ def test_select(table_name="", schema="str String,numuint UInt32,numint Int32,do
if table_name:
get_ch_answer("drop table if exists {}".format(table_name))
get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, SERVER_ADDRESS_STR))
get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, HTTP_SERVER_URL_STR))
for i in xrange(len(requests)):
tbl = table_name
if not tbl:
tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema)
tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=HTTP_SERVER_URL_STR, schema=schema)
check_answers(requests[i].format(tbl=tbl), answers[i])
if table_name:
@ -113,19 +127,19 @@ def test_insert(table_name="", schema="str String,numuint UInt32,numint Int32,do
if table_name:
get_ch_answer("drop table if exists {}".format(table_name))
get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, SERVER_ADDRESS_STR))
get_ch_answer("create table {} ({}) engine=URL('{}', 'CSV')".format(table_name, schema, HTTP_SERVER_URL_STR))
for req in requests_insert:
tbl = table_name
if not tbl:
tbl = "table function url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema)
tbl = "table function url('{addr}', 'CSV', '{schema}')".format(addr=HTTP_SERVER_URL_STR, schema=schema)
get_ch_answer(req.format(tbl=tbl))
for i in xrange(len(requests_select)):
tbl = table_name
if not tbl:
tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=SERVER_ADDRESS_STR, schema=schema)
tbl = "url('{addr}', 'CSV', '{schema}')".format(addr=HTTP_SERVER_URL_STR, schema=schema)
check_answers(requests_select[i].format(tbl=tbl), answers[i])
if table_name:

View File

@ -0,0 +1,3 @@
CODEC(Delta(4))
CODEC(Delta(4), LZ4)
CODEC(Delta(8), LZ4)

Some files were not shown because too many files have changed in this diff Show More