Merge branch 'master' of github.com:ClickHouse/ClickHouse

This commit is contained in:
Sergei Shtykov 2019-12-03 13:14:05 +03:00
commit b8e5383e0b
163 changed files with 2350 additions and 478 deletions

View File

@ -15,4 +15,4 @@ ClickHouse is an open-source column-oriented database management system that all
## Upcoming Events ## Upcoming Events
* [ClickHouse Meetup in San Francisco](https://www.eventbrite.com/e/clickhouse-december-meetup-registration-78642047481) on December 3. * [ClickHouse Meetup in San Francisco](https://www.eventbrite.com/e/clickhouse-december-meetup-registration-78642047481) on December 3.
* [ClickHouse Meetup in Moscow](https://yandex.ru/promo/clickhouse/moscow-december-2019) on December 11.

View File

@ -11,7 +11,9 @@ endif ()
set(LIBUNWIND_C_SOURCES set(LIBUNWIND_C_SOURCES
${LIBUNWIND_SOURCE_DIR}/src/UnwindLevel1.c ${LIBUNWIND_SOURCE_DIR}/src/UnwindLevel1.c
${LIBUNWIND_SOURCE_DIR}/src/UnwindLevel1-gcc-ext.c ${LIBUNWIND_SOURCE_DIR}/src/UnwindLevel1-gcc-ext.c
${LIBUNWIND_SOURCE_DIR}/src/Unwind-sjlj.c) ${LIBUNWIND_SOURCE_DIR}/src/Unwind-sjlj.c
# Use unw_backtrace to override libgcc's backtrace symbol for better ABI compatibility
unwind-override.c)
set_source_files_properties(${LIBUNWIND_C_SOURCES} PROPERTIES COMPILE_FLAGS "-std=c99") set_source_files_properties(${LIBUNWIND_C_SOURCES} PROPERTIES COMPILE_FLAGS "-std=c99")
set(LIBUNWIND_ASM_SOURCES set(LIBUNWIND_ASM_SOURCES

View File

@ -0,0 +1,6 @@
#include <libunwind.h>
int backtrace(void ** buffer, int size)
{
return unw_backtrace(buffer, size);
}

View File

@ -376,6 +376,10 @@ if (USE_POCO_MONGODB)
dbms_target_link_libraries (PRIVATE ${Poco_MongoDB_LIBRARY}) dbms_target_link_libraries (PRIVATE ${Poco_MongoDB_LIBRARY})
endif() endif()
if (USE_POCO_REDIS)
dbms_target_link_libraries (PRIVATE ${Poco_Redis_LIBRARY})
endif()
if (USE_POCO_NETSSL) if (USE_POCO_NETSSL)
target_link_libraries (clickhouse_common_io PRIVATE ${Poco_NetSSL_LIBRARY} ${Poco_Crypto_LIBRARY}) target_link_libraries (clickhouse_common_io PRIVATE ${Poco_NetSSL_LIBRARY} ${Poco_Crypto_LIBRARY})
dbms_target_link_libraries (PRIVATE ${Poco_NetSSL_LIBRARY} ${Poco_Crypto_LIBRARY}) dbms_target_link_libraries (PRIVATE ${Poco_NetSSL_LIBRARY} ${Poco_Crypto_LIBRARY})

View File

@ -15,6 +15,7 @@
#include <IO/ReadBufferFromString.h> #include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromPocoSocket.h> #include <IO/WriteBufferFromPocoSocket.h>
#include <Storages/IStorage.h> #include <Storages/IStorage.h>
#include <boost/algorithm/string/replace.hpp>
#if USE_POCO_NETSSL #if USE_POCO_NETSSL
#include <Poco/Net/SecureStreamSocket.h> #include <Poco/Net/SecureStreamSocket.h>
@ -267,30 +268,50 @@ void MySQLHandler::comPing()
packet_sender->sendPacket(OK_Packet(0x0, client_capability_flags, 0, 0, 0), true); packet_sender->sendPacket(OK_Packet(0x0, client_capability_flags, 0, 0, 0), true);
} }
static bool isFederatedServerSetupCommand(const String & query);
void MySQLHandler::comQuery(ReadBuffer & payload) void MySQLHandler::comQuery(ReadBuffer & payload)
{
String query = String(payload.position(), payload.buffer().end());
// This is a workaround in order to support adding ClickHouse to MySQL using federated server.
// As Clickhouse doesn't support these statements, we just send OK packet in response.
if (isFederatedServerSetupCommand(query))
{
packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true);
}
else
{ {
bool with_output = false; bool with_output = false;
std::function<void(const String &)> set_content_type = [&with_output](const String &) -> void { std::function<void(const String &)> set_content_type = [&with_output](const String &) -> void {
with_output = true; with_output = true;
}; };
const String query("select ''"); String replacement_query = "select ''";
ReadBufferFromString empty_select(query);
bool should_replace = false; bool should_replace = false;
// Translate query from MySQL to ClickHouse. // Translate query from MySQL to ClickHouse.
// This is a temporary workaround until ClickHouse supports the syntax "@@var_name". // This is a temporary workaround until ClickHouse supports the syntax "@@var_name".
if (std::string(payload.position(), payload.buffer().end()) == "select @@version_comment limit 1") // MariaDB client starts session with that query if (query == "select @@version_comment limit 1") // MariaDB client starts session with that query
{ {
should_replace = true; should_replace = true;
} }
// This is a workaround in order to support adding ClickHouse to MySQL using federated server.
if (0 == strncasecmp("SHOW TABLE STATUS LIKE", query.c_str(), 22))
{
should_replace = true;
replacement_query = boost::replace_all_copy(query, "SHOW TABLE STATUS LIKE ", show_table_status_replacement_query);
}
ReadBufferFromString replacement(replacement_query);
Context query_context = connection_context; Context query_context = connection_context;
executeQuery(should_replace ? empty_select : payload, *out, true, query_context, set_content_type, nullptr); executeQuery(should_replace ? replacement : payload, *out, true, query_context, set_content_type, nullptr);
if (!with_output) if (!with_output)
packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true); packet_sender->sendPacket(OK_Packet(0x00, client_capability_flags, 0, 0, 0), true);
} }
}
void MySQLHandler::authPluginSSL() void MySQLHandler::authPluginSSL()
{ {
@ -335,4 +356,33 @@ void MySQLHandlerSSL::finishHandshakeSSL(size_t packet_size, char * buf, size_t
#endif #endif
static bool isFederatedServerSetupCommand(const String & query)
{
return 0 == strncasecmp("SET NAMES", query.c_str(), 9) || 0 == strncasecmp("SET character_set_results", query.c_str(), 25)
|| 0 == strncasecmp("SET FOREIGN_KEY_CHECKS", query.c_str(), 22) || 0 == strncasecmp("SET AUTOCOMMIT", query.c_str(), 14)
|| 0 == strncasecmp("SET SESSION TRANSACTION ISOLATION LEVEL", query.c_str(), 39);
}
const String MySQLHandler::show_table_status_replacement_query("SELECT"
" name AS Name,"
" engine AS Engine,"
" '10' AS Version,"
" 'Dynamic' AS Row_format,"
" 0 AS Rows,"
" 0 AS Avg_row_length,"
" 0 AS Data_length,"
" 0 AS Max_data_length,"
" 0 AS Index_length,"
" 0 AS Data_free,"
" 'NULL' AS Auto_increment,"
" metadata_modification_time AS Create_time,"
" metadata_modification_time AS Update_time,"
" metadata_modification_time AS Check_time,"
" 'utf8_bin' AS Collation,"
" 'NULL' AS Checksum,"
" '' AS Create_options,"
" '' AS Comment"
" FROM system.tables"
" WHERE name LIKE ");
} }

View File

@ -11,7 +11,6 @@
namespace DB namespace DB
{ {
/// Handler for MySQL wire protocol connections. Allows to connect to ClickHouse using MySQL client. /// Handler for MySQL wire protocol connections. Allows to connect to ClickHouse using MySQL client.
class MySQLHandler : public Poco::Net::TCPServerConnection class MySQLHandler : public Poco::Net::TCPServerConnection
{ {
@ -59,6 +58,9 @@ protected:
std::shared_ptr<WriteBuffer> out; std::shared_ptr<WriteBuffer> out;
bool secure_connection = false; bool secure_connection = false;
private:
static const String show_table_status_replacement_query;
}; };
#if USE_SSL && USE_POCO_NETSSL #if USE_SSL && USE_POCO_NETSSL

View File

@ -219,6 +219,7 @@ public:
Field getField() const { return getDataColumn()[0]; } Field getField() const { return getDataColumn()[0]; }
/// The constant value. It is valid even if the size of the column is 0.
template <typename T> template <typename T>
T getValue() const { return getField().safeGet<NearestFieldType<T>>(); } T getValue() const { return getField().safeGet<NearestFieldType<T>>(); }
}; };

View File

@ -144,7 +144,7 @@ public:
} }
void insert(const T value) { data.push_back(value); } void insertValue(const T value) { data.push_back(value); }
Container & getData() { return data; } Container & getData() { return data; }
const Container & getData() const { return data; } const Container & getData() const { return data; }
const T & getElement(size_t n) const { return data[n]; } const T & getElement(size_t n) const { return data[n]; }

View File

@ -469,7 +469,6 @@ namespace ErrorCodes
extern const int POCO_EXCEPTION = 1000; extern const int POCO_EXCEPTION = 1000;
extern const int STD_EXCEPTION = 1001; extern const int STD_EXCEPTION = 1001;
extern const int UNKNOWN_EXCEPTION = 1002; extern const int UNKNOWN_EXCEPTION = 1002;
extern const int METRIKA_OTHER_ERROR = 1003;
extern const int CONDITIONAL_TREE_PARENT_NOT_FOUND = 2001; extern const int CONDITIONAL_TREE_PARENT_NOT_FOUND = 2001;
extern const int ILLEGAL_PROJECTION_MANIPULATOR = 2002; extern const int ILLEGAL_PROJECTION_MANIPULATOR = 2002;

View File

@ -17,7 +17,6 @@ namespace DB
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int POCO_EXCEPTION; extern const int POCO_EXCEPTION;
extern const int METRIKA_OTHER_ERROR;
} }
class Exception : public Poco::Exception class Exception : public Poco::Exception

View File

@ -84,6 +84,23 @@ struct DefaultHash<T, std::enable_if_t<is_arithmetic_v<T>>>
} }
}; };
template <typename T>
struct DefaultHash<T, std::enable_if_t<DB::IsDecimalNumber<T> && sizeof(T) <= 8>>
{
size_t operator() (T key) const
{
return DefaultHash64<typename T::NativeType>(key);
}
};
template <typename T>
struct DefaultHash<T, std::enable_if_t<DB::IsDecimalNumber<T> && sizeof(T) == 16>>
{
size_t operator() (T key) const
{
return DefaultHash64<Int64>(key >> 64) ^ DefaultHash64<Int64>(key);
}
};
template <typename T> struct HashCRC32; template <typename T> struct HashCRC32;

View File

@ -158,7 +158,7 @@ std::string signalToErrorMessage(int sig, const siginfo_t & info, const ucontext
break; break;
} }
case SIGPROF: case SIGTSTP:
{ {
error << "This is a signal used for debugging purposes by the user."; error << "This is a signal used for debugging purposes by the user.";
break; break;

View File

@ -100,4 +100,71 @@ size_t getLengthEncodedStringSize(const String & s)
return getLengthEncodedNumberSize(s.size()) + s.size(); return getLengthEncodedNumberSize(s.size()) + s.size();
} }
ColumnDefinition getColumnDefinition(const String & column_name, const TypeIndex type_index)
{
ColumnType column_type;
int flags = 0;
switch (type_index)
{
case TypeIndex::UInt8:
column_type = ColumnType::MYSQL_TYPE_TINY;
flags = ColumnDefinitionFlags::BINARY_FLAG | ColumnDefinitionFlags::UNSIGNED_FLAG;
break;
case TypeIndex::UInt16:
column_type = ColumnType::MYSQL_TYPE_SHORT;
flags = ColumnDefinitionFlags::BINARY_FLAG | ColumnDefinitionFlags::UNSIGNED_FLAG;
break;
case TypeIndex::UInt32:
column_type = ColumnType::MYSQL_TYPE_LONG;
flags = ColumnDefinitionFlags::BINARY_FLAG | ColumnDefinitionFlags::UNSIGNED_FLAG;
break;
case TypeIndex::UInt64:
column_type = ColumnType::MYSQL_TYPE_LONGLONG;
flags = ColumnDefinitionFlags::BINARY_FLAG | ColumnDefinitionFlags::UNSIGNED_FLAG;
break;
case TypeIndex::Int8:
column_type = ColumnType::MYSQL_TYPE_TINY;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::Int16:
column_type = ColumnType::MYSQL_TYPE_SHORT;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::Int32:
column_type = ColumnType::MYSQL_TYPE_LONG;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::Int64:
column_type = ColumnType::MYSQL_TYPE_LONGLONG;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::Float32:
column_type = ColumnType::MYSQL_TYPE_FLOAT;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::Float64:
column_type = ColumnType::MYSQL_TYPE_TINY;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::Date:
column_type = ColumnType::MYSQL_TYPE_DATE;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::DateTime:
column_type = ColumnType::MYSQL_TYPE_DATETIME;
flags = ColumnDefinitionFlags::BINARY_FLAG;
break;
case TypeIndex::String:
column_type = ColumnType::MYSQL_TYPE_STRING;
break;
case TypeIndex::FixedString:
column_type = ColumnType::MYSQL_TYPE_STRING;
break;
default:
column_type = ColumnType::MYSQL_TYPE_STRING;
break;
}
return ColumnDefinition(column_name, CharacterSet::binary, 0, column_type, flags, 0);
}
} }

View File

@ -130,6 +130,14 @@ enum ColumnType
}; };
// https://dev.mysql.com/doc/dev/mysql-server/latest/group__group__cs__column__definition__flags.html
enum ColumnDefinitionFlags
{
UNSIGNED_FLAG = 32,
BINARY_FLAG = 128
};
class ProtocolError : public DB::Exception class ProtocolError : public DB::Exception
{ {
public: public:
@ -824,19 +832,40 @@ protected:
} }
}; };
ColumnDefinition getColumnDefinition(const String & column_name, const TypeIndex index);
namespace ProtocolText
{
class ResultsetRow : public WritePacket class ResultsetRow : public WritePacket
{ {
std::vector<String> columns; const Columns & columns;
int row_num;
size_t payload_size = 0; size_t payload_size = 0;
std::vector<String> serialized;
public: public:
ResultsetRow() = default; ResultsetRow(const DataTypes & data_types, const Columns & columns_, int row_num_)
: columns(columns_)
void appendColumn(String && value) , row_num(row_num_)
{ {
payload_size += getLengthEncodedStringSize(value); for (size_t i = 0; i < columns.size(); i++)
columns.emplace_back(std::move(value)); {
if (columns[i]->isNullAt(row_num))
{
payload_size += 1;
serialized.emplace_back("\xfb");
}
else
{
WriteBufferFromOwnString ostr;
data_types[i]->serializeAsText(*columns[i], row_num, ostr, FormatSettings());
payload_size += getLengthEncodedStringSize(ostr.str());
serialized.push_back(std::move(ostr.str()));
}
}
} }
protected: protected:
size_t getPayloadSize() const override size_t getPayloadSize() const override
{ {
@ -845,11 +874,18 @@ protected:
void writePayloadImpl(WriteBuffer & buffer) const override void writePayloadImpl(WriteBuffer & buffer) const override
{ {
for (const String & column : columns) for (size_t i = 0; i < columns.size(); i++)
writeLengthEncodedString(column, buffer); {
if (columns[i]->isNullAt(row_num))
buffer.write(serialized[i].data(), 1);
else
writeLengthEncodedString(serialized[i], buffer);
}
} }
}; };
}
namespace Authentication namespace Authentication
{ {

View File

@ -76,6 +76,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \ M(SettingBool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \
M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \ M(SettingBool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \ M(SettingUInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(SettingUInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.", 0) \ M(SettingUInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables. Only has meaning at server startup.", 0) \
\ \
M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \ M(SettingMilliseconds, distributed_directory_monitor_sleep_time_ms, 100, "Sleep time for StorageDistributed DirectoryMonitors, in case of any errors delay grows exponentially.", 0) \

View File

@ -5,6 +5,9 @@
namespace DB namespace DB
{ {
using TypeListNumbers = TypeList<UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64>; using TypeListNativeNumbers = TypeList<UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64>;
using TypeListDecimalNumbers = TypeList<Decimal32, Decimal64, Decimal128>;
using TypeListNumbers = TypeList<UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64,
Decimal32, Decimal64, Decimal128>;
} }

View File

@ -57,6 +57,13 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
} }
} }
void NativeBlockInputStream::resetParser()
{
istr_concrete = nullptr;
use_index = false;
header.clear();
avg_value_size_hints.clear();
}
void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint) void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
{ {

View File

@ -78,6 +78,9 @@ public:
Block getHeader() const override; Block getHeader() const override;
void resetParser();
protected: protected:
Block readImpl() override; Block readImpl() override;

View File

@ -894,7 +894,7 @@ MutableColumnUniquePtr DataTypeLowCardinality::createColumnUniqueImpl(const IDat
if (isColumnedAsNumber(type)) if (isColumnedAsNumber(type))
{ {
MutableColumnUniquePtr column; MutableColumnUniquePtr column;
TypeListNumbers::forEach(CreateColumnVector(column, *type, creator)); TypeListNativeNumbers::forEach(CreateColumnVector(column, *type, creator));
if (!column) if (!column)
throw Exception("Unexpected numeric type: " + type->getName(), ErrorCodes::LOGICAL_ERROR); throw Exception("Unexpected numeric type: " + type->getName(), ErrorCodes::LOGICAL_ERROR);

View File

@ -361,9 +361,8 @@ StoragePtr DatabaseLazy::loadTable(const Context & context, const String & table
} }
catch (const Exception & e) catch (const Exception & e)
{ {
throw Exception("Cannot create table from metadata file " + table_metadata_path + ", error: " + e.displayText() + throw Exception("Cannot create table from metadata file " + table_metadata_path + ". Error: " + DB::getCurrentExceptionMessage(true),
", stack trace:\n" + e.getStackTrace().toString(), e, DB::ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA);
ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA);
} }
} }

View File

@ -27,6 +27,7 @@
#include <Poco/Event.h> #include <Poco/Event.h>
#include <Common/Stopwatch.h> #include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h> #include <Common/StringUtils/StringUtils.h>
#include <Common/quoteString.h>
#include <Common/ThreadPool.h> #include <Common/ThreadPool.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
@ -81,9 +82,8 @@ try
catch (const Exception & e) catch (const Exception & e)
{ {
throw Exception( throw Exception(
"Cannot create object '" + query.table + "' from query " + serializeAST(query) + ", error: " + e.displayText() + ", stack trace:\n" "Cannot create object '" + query.table + "' from query " + serializeAST(query) + ". Error: " + DB::getCurrentExceptionMessage(true),
+ e.getStackTrace().toString(), e, DB::ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA);
ErrorCodes::CANNOT_CREATE_TABLE_FROM_METADATA);
} }
@ -138,8 +138,7 @@ void DatabaseOrdinary::loadStoredObjects(
catch (const Exception & e) catch (const Exception & e)
{ {
throw Exception( throw Exception(
"Cannot parse definition from metadata file " + full_path + ", error: " + e.displayText() + ", stack trace:\n" "Cannot parse definition from metadata file " + full_path + ". Error: " + DB::getCurrentExceptionMessage(true), e, ErrorCodes::CANNOT_PARSE_TEXT);
+ e.getStackTrace().toString(), ErrorCodes::CANNOT_PARSE_TEXT);
} }
}); });
@ -180,7 +179,15 @@ void DatabaseOrdinary::loadStoredObjects(
auto & external_loader = context.getExternalDictionariesLoader(); auto & external_loader = context.getExternalDictionariesLoader();
external_loader.addConfigRepository(getDatabaseName(), std::move(dictionaries_repository)); external_loader.addConfigRepository(getDatabaseName(), std::move(dictionaries_repository));
bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load", true); bool lazy_load = context.getConfigRef().getBool("dictionaries_lazy_load", true);
external_loader.reload(!lazy_load);
auto filter = [this](const std::string & dictionary_name) -> bool
{
if (!startsWith(dictionary_name, name + "." /* db name */))
return false;
LOG_INFO(log, "Loading dictionary " << backQuote(dictionary_name) << ", for database " << backQuote(name));
return true;
};
external_loader.reload(filter, !lazy_load);
} }

View File

@ -48,7 +48,7 @@ public:
double getLoadFactor() const override { return static_cast<double>(element_count.load(std::memory_order_relaxed)) / size; } double getLoadFactor() const override { return static_cast<double>(element_count.load(std::memory_order_relaxed)) / size; }
bool isCached() const override { return true; } bool supportUpdates() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override std::shared_ptr<const IExternalLoadable> clone() const override
{ {

View File

@ -71,7 +71,7 @@ public:
double getLoadFactor() const override { return static_cast<double>(element_count.load(std::memory_order_relaxed)) / size; } double getLoadFactor() const override { return static_cast<double>(element_count.load(std::memory_order_relaxed)) / size; }
bool isCached() const override { return true; } bool supportUpdates() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override std::shared_ptr<const IExternalLoadable> clone() const override
{ {

View File

@ -46,8 +46,6 @@ public:
double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; } double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
bool isCached() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override std::shared_ptr<const IExternalLoadable> clone() const override
{ {
return std::make_shared<ComplexKeyHashedDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block); return std::make_shared<ComplexKeyHashedDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block);

View File

@ -43,8 +43,6 @@ public:
double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; } double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
bool isCached() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override std::shared_ptr<const IExternalLoadable> clone() const override
{ {
return std::make_shared<FlatDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block); return std::make_shared<FlatDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, saved_block);

View File

@ -48,8 +48,6 @@ public:
double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; } double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
bool isCached() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override std::shared_ptr<const IExternalLoadable> clone() const override
{ {
return std::make_shared<HashedDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, sparse, saved_block); return std::make_shared<HashedDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty, sparse, saved_block);

View File

@ -37,8 +37,6 @@ struct IDictionaryBase : public IExternalLoadable
virtual double getLoadFactor() const = 0; virtual double getLoadFactor() const = 0;
virtual bool isCached() const = 0;
virtual const IDictionarySource * getSource() const = 0; virtual const IDictionarySource * getSource() const = 0;
virtual const DictionaryStructure & getStructure() const = 0; virtual const DictionaryStructure & getStructure() const = 0;
@ -47,7 +45,7 @@ struct IDictionaryBase : public IExternalLoadable
virtual BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const = 0; virtual BlockInputStreamPtr getBlockInputStream(const Names & column_names, size_t max_block_size) const = 0;
bool supportUpdates() const override { return !isCached(); } bool supportUpdates() const override { return true; }
bool isModified() const override bool isModified() const override
{ {

View File

@ -38,8 +38,6 @@ public:
double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; } double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
bool isCached() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override std::shared_ptr<const IExternalLoadable> clone() const override
{ {
return std::make_shared<RangeHashedDictionary>(dictionary_name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty); return std::make_shared<RangeHashedDictionary>(dictionary_name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty);

View File

@ -47,8 +47,6 @@ public:
double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; } double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
bool isCached() const override { return false; }
std::shared_ptr<const IExternalLoadable> clone() const override std::shared_ptr<const IExternalLoadable> clone() const override
{ {
return std::make_shared<TrieDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty); return std::make_shared<TrieDictionary>(name, dict_struct, source_ptr->clone(), dict_lifetime, require_nonempty);

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <Core/Types.h>
#include <Common/FieldVisitors.h> #include <Common/FieldVisitors.h>
#include "Sources.h" #include "Sources.h"
#include "Sinks.h" #include "Sinks.h"
@ -78,10 +79,18 @@ template <typename T>
inline ALWAYS_INLINE void writeSlice(const NumericArraySlice<T> & slice, GenericArraySink & sink) inline ALWAYS_INLINE void writeSlice(const NumericArraySlice<T> & slice, GenericArraySink & sink)
{ {
for (size_t i = 0; i < slice.size; ++i) for (size_t i = 0; i < slice.size; ++i)
{
if constexpr (IsDecimalNumber<T>)
{
DecimalField field(T(slice.data[i]), 0); /// TODO: Decimal scale
sink.elements.insert(field);
}
else
{ {
Field field = T(slice.data[i]); Field field = T(slice.data[i]);
sink.elements.insert(field); sink.elements.insert(field);
} }
}
sink.current_offset += slice.size; sink.current_offset += slice.size;
} }
@ -422,8 +431,17 @@ bool sliceHasImpl(const FirstSliceType & first, const SecondSliceType & second,
} }
template <typename T, typename U> template <typename T, typename U>
bool sliceEqualElements(const NumericArraySlice<T> & first, const NumericArraySlice<U> & second, size_t first_ind, size_t second_ind) bool sliceEqualElements(const NumericArraySlice<T> & first [[maybe_unused]],
const NumericArraySlice<U> & second [[maybe_unused]],
size_t first_ind [[maybe_unused]],
size_t second_ind [[maybe_unused]])
{ {
/// TODO: Decimal scale
if constexpr (IsDecimalNumber<T> && IsDecimalNumber<U>)
return accurate::equalsOp(typename T::NativeType(first.data[first_ind]), typename U::NativeType(second.data[second_ind]));
else if constexpr (IsDecimalNumber<T> || IsDecimalNumber<U>)
return false;
else
return accurate::equalsOp(first.data[first_ind], second.data[second_ind]); return accurate::equalsOp(first.data[first_ind], second.data[second_ind]);
} }

View File

@ -3,6 +3,7 @@
#include "IArraySink.h" #include "IArraySink.h"
#include <Columns/ColumnVector.h> #include <Columns/ColumnVector.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h> #include <Columns/ColumnFixedString.h>
@ -33,17 +34,18 @@ struct NullableValueSource;
template <typename T> template <typename T>
struct NumericArraySink : public ArraySinkImpl<NumericArraySink<T>> struct NumericArraySink : public ArraySinkImpl<NumericArraySink<T>>
{ {
using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
using CompatibleArraySource = NumericArraySource<T>; using CompatibleArraySource = NumericArraySource<T>;
using CompatibleValueSource = NumericValueSource<T>; using CompatibleValueSource = NumericValueSource<T>;
typename ColumnVector<T>::Container & elements; typename ColVecType::Container & elements;
typename ColumnArray::Offsets & offsets; typename ColumnArray::Offsets & offsets;
size_t row_num = 0; size_t row_num = 0;
ColumnArray::Offset current_offset = 0; ColumnArray::Offset current_offset = 0;
NumericArraySink(ColumnArray & arr, size_t column_size) NumericArraySink(ColumnArray & arr, size_t column_size)
: elements(typeid_cast<ColumnVector<T> &>(arr.getData()).getData()), offsets(arr.getOffsets()) : elements(typeid_cast<ColVecType &>(arr.getData()).getData()), offsets(arr.getOffsets())
{ {
offsets.resize(column_size); offsets.resize(column_size);
} }

View File

@ -1,6 +1,7 @@
#pragma once #pragma once
#include <Columns/ColumnVector.h> #include <Columns/ColumnVector.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h> #include <Columns/ColumnFixedString.h>
@ -30,17 +31,18 @@ namespace GatherUtils
template <typename T> template <typename T>
struct NumericArraySource : public ArraySourceImpl<NumericArraySource<T>> struct NumericArraySource : public ArraySourceImpl<NumericArraySource<T>>
{ {
using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
using Slice = NumericArraySlice<T>; using Slice = NumericArraySlice<T>;
using Column = ColumnArray; using Column = ColumnArray;
const typename ColumnVector<T>::Container & elements; const typename ColVecType::Container & elements;
const typename ColumnArray::Offsets & offsets; const typename ColumnArray::Offsets & offsets;
size_t row_num = 0; size_t row_num = 0;
ColumnArray::Offset prev_offset = 0; ColumnArray::Offset prev_offset = 0;
explicit NumericArraySource(const ColumnArray & arr) explicit NumericArraySource(const ColumnArray & arr)
: elements(typeid_cast<const ColumnVector<T> &>(arr.getData()).getData()), offsets(arr.getOffsets()) : elements(typeid_cast<const ColVecType &>(arr.getData()).getData()), offsets(arr.getOffsets())
{ {
} }
@ -650,7 +652,7 @@ template <typename T>
struct NumericValueSource : ValueSourceImpl<NumericValueSource<T>> struct NumericValueSource : ValueSourceImpl<NumericValueSource<T>>
{ {
using Slice = NumericValueSlice<T>; using Slice = NumericValueSlice<T>;
using Column = ColumnVector<T>; using Column = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
const T * begin; const T * begin;
size_t total_rows; size_t total_rows;

View File

@ -14,7 +14,9 @@ struct ArraySinkCreator<Type, Types...>
{ {
static std::unique_ptr<IArraySink> create(ColumnArray & col, NullMap * null_map, size_t column_size) static std::unique_ptr<IArraySink> create(ColumnArray & col, NullMap * null_map, size_t column_size)
{ {
if (typeid_cast<ColumnVector<Type> *>(&col.getData())) using ColVecType = std::conditional_t<IsDecimalNumber<Type>, ColumnDecimal<Type>, ColumnVector<Type>>;
if (typeid_cast<ColVecType *>(&col.getData()))
{ {
if (null_map) if (null_map)
return std::make_unique<NullableArraySink<NumericArraySink<Type>>>(col, *null_map, column_size); return std::make_unique<NullableArraySink<NumericArraySink<Type>>>(col, *null_map, column_size);

View File

@ -14,7 +14,9 @@ struct ArraySourceCreator<Type, Types...>
{ {
static std::unique_ptr<IArraySource> create(const ColumnArray & col, const NullMap * null_map, bool is_const, size_t total_rows) static std::unique_ptr<IArraySource> create(const ColumnArray & col, const NullMap * null_map, bool is_const, size_t total_rows)
{ {
if (typeid_cast<const ColumnVector<Type> *>(&col.getData())) using ColVecType = std::conditional_t<IsDecimalNumber<Type>, ColumnDecimal<Type>, ColumnVector<Type>>;
if (typeid_cast<const ColVecType *>(&col.getData()))
{ {
if (null_map) if (null_map)
{ {

View File

@ -14,7 +14,9 @@ struct ValueSourceCreator<Type, Types...>
{ {
static std::unique_ptr<IValueSource> create(const IColumn & col, const NullMap * null_map, bool is_const, size_t total_rows) static std::unique_ptr<IValueSource> create(const IColumn & col, const NullMap * null_map, bool is_const, size_t total_rows)
{ {
if (auto column_vector = typeid_cast<const ColumnVector<Type> *>(&col)) using ColVecType = std::conditional_t<IsDecimalNumber<Type>, ColumnDecimal<Type>, ColumnVector<Type>>;
if (auto column_vector = typeid_cast<const ColVecType *>(&col))
{ {
if (null_map) if (null_map)
{ {

View File

@ -590,7 +590,7 @@ struct CallPointInPolygon<Type, Types ...>
template <typename PointInPolygonImpl> template <typename PointInPolygonImpl>
static ColumnPtr call(const IColumn & x, const IColumn & y, PointInPolygonImpl && impl) static ColumnPtr call(const IColumn & x, const IColumn & y, PointInPolygonImpl && impl)
{ {
using Impl = typename ApplyTypeListForClass<::DB::GeoUtils::CallPointInPolygon, TypeListNumbers>::Type; using Impl = typename ApplyTypeListForClass<::DB::GeoUtils::CallPointInPolygon, TypeListNativeNumbers>::Type;
if (auto column = typeid_cast<const ColumnVector<Type> *>(&x)) if (auto column = typeid_cast<const ColumnVector<Type> *>(&x))
return Impl::template call<Type>(*column, y, impl); return Impl::template call<Type>(*column, y, impl);
return CallPointInPolygon<Types ...>::call(x, y, impl); return CallPointInPolygon<Types ...>::call(x, y, impl);
@ -616,7 +616,7 @@ struct CallPointInPolygon<>
template <typename PointInPolygonImpl> template <typename PointInPolygonImpl>
ColumnPtr pointInPolygon(const IColumn & x, const IColumn & y, PointInPolygonImpl && impl) ColumnPtr pointInPolygon(const IColumn & x, const IColumn & y, PointInPolygonImpl && impl)
{ {
using Impl = typename ApplyTypeListForClass<::DB::GeoUtils::CallPointInPolygon, TypeListNumbers>::Type; using Impl = typename ApplyTypeListForClass<::DB::GeoUtils::CallPointInPolygon, TypeListNativeNumbers>::Type;
return Impl::call(x, y, impl); return Impl::call(x, y, impl);
} }

View File

@ -1,5 +1,7 @@
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include <Functions/array/FunctionArrayMapped.h> #include <Functions/array/FunctionArrayMapped.h>
#include <Functions/FunctionFactory.h> #include <Functions/FunctionFactory.h>
@ -27,16 +29,23 @@ struct ArrayCompactImpl
template <typename T> template <typename T>
static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr) static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr)
{ {
const ColumnVector<T> * src_values_column = checkAndGetColumn<ColumnVector<T>>(mapped.get()); using ColVecType = std::conditional_t<IsDecimalNumber<T>, ColumnDecimal<T>, ColumnVector<T>>;
const ColVecType * src_values_column = checkAndGetColumn<ColVecType>(mapped.get());
if (!src_values_column) if (!src_values_column)
return false; return false;
const IColumn::Offsets & src_offsets = array.getOffsets(); const IColumn::Offsets & src_offsets = array.getOffsets();
const typename ColumnVector<T>::Container & src_values = src_values_column->getData(); const typename ColVecType::Container & src_values = src_values_column->getData();
auto res_values_column = ColumnVector<T>::create(src_values.size()); typename ColVecType::MutablePtr res_values_column;
typename ColumnVector<T>::Container & res_values = res_values_column->getData(); if constexpr (IsDecimalNumber<T>)
res_values_column = ColVecType::create(src_values.size(), src_values.getScale());
else
res_values_column = ColVecType::create(src_values.size());
typename ColVecType::Container & res_values = res_values_column->getData();
size_t src_offsets_size = src_offsets.size(); size_t src_offsets_size = src_offsets.size();
auto res_offsets_column = ColumnArray::ColumnOffsets::create(src_offsets_size); auto res_offsets_column = ColumnArray::ColumnOffsets::create(src_offsets_size);
IColumn::Offsets & res_offsets = res_offsets_column->getData(); IColumn::Offsets & res_offsets = res_offsets_column->getData();
@ -129,7 +138,10 @@ struct ArrayCompactImpl
executeType< Int32 >(mapped, array, res) || executeType< Int32 >(mapped, array, res) ||
executeType< Int64 >(mapped, array, res) || executeType< Int64 >(mapped, array, res) ||
executeType<Float32>(mapped, array, res) || executeType<Float32>(mapped, array, res) ||
executeType<Float64>(mapped, array, res))) executeType<Float64>(mapped, array, res)) ||
executeType<Decimal32>(mapped, array, res) ||
executeType<Decimal64>(mapped, array, res) ||
executeType<Decimal128>(mapped, array, res))
{ {
executeGeneric(mapped, array, res); executeGeneric(mapped, array, res);
} }

View File

@ -1,5 +1,7 @@
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include "FunctionArrayMapped.h" #include "FunctionArrayMapped.h"
#include <Functions/FunctionFactory.h> #include <Functions/FunctionFactory.h>
@ -31,6 +33,13 @@ struct ArrayCumSumImpl
if (which.isFloat()) if (which.isFloat())
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeFloat64>()); return std::make_shared<DataTypeArray>(std::make_shared<DataTypeFloat64>());
if (which.isDecimal())
{
UInt32 scale = getDecimalScale(*expression_return);
DataTypePtr nested = std::make_shared<DataTypeDecimal<Decimal128>>(maxDecimalPrecision<Decimal128>(), scale);
return std::make_shared<DataTypeArray>(nested);
}
throw Exception("arrayCumSum cannot add values of type " + expression_return->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); throw Exception("arrayCumSum cannot add values of type " + expression_return->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
} }
@ -38,11 +47,14 @@ struct ArrayCumSumImpl
template <typename Element, typename Result> template <typename Element, typename Result>
static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr) static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr)
{ {
const ColumnVector<Element> * column = checkAndGetColumn<ColumnVector<Element>>(&*mapped); using ColVecType = std::conditional_t<IsDecimalNumber<Element>, ColumnDecimal<Element>, ColumnVector<Element>>;
using ColVecResult = std::conditional_t<IsDecimalNumber<Result>, ColumnDecimal<Result>, ColumnVector<Result>>;
const ColVecType * column = checkAndGetColumn<ColVecType>(&*mapped);
if (!column) if (!column)
{ {
const ColumnConst * column_const = checkAndGetColumnConst<ColumnVector<Element>>(&*mapped); const ColumnConst * column_const = checkAndGetColumnConst<ColVecType>(&*mapped);
if (!column_const) if (!column_const)
return false; return false;
@ -50,8 +62,17 @@ struct ArrayCumSumImpl
const Element x = column_const->template getValue<Element>(); const Element x = column_const->template getValue<Element>();
const IColumn::Offsets & offsets = array.getOffsets(); const IColumn::Offsets & offsets = array.getOffsets();
auto res_nested = ColumnVector<Result>::create(); typename ColVecResult::MutablePtr res_nested;
typename ColumnVector<Result>::Container & res_values = res_nested->getData(); if constexpr (IsDecimalNumber<Element>)
{
const typename ColVecType::Container & data =
checkAndGetColumn<ColVecType>(&column_const->getDataColumn())->getData();
res_nested = ColVecResult::create(0, data.getScale());
}
else
res_nested = ColVecResult::create();
typename ColVecResult::Container & res_values = res_nested->getData();
res_values.resize(column_const->size()); res_values.resize(column_const->size());
size_t pos = 0; size_t pos = 0;
@ -72,11 +93,16 @@ struct ArrayCumSumImpl
return true; return true;
} }
const typename ColVecType::Container & data = column->getData();
const IColumn::Offsets & offsets = array.getOffsets(); const IColumn::Offsets & offsets = array.getOffsets();
const typename ColumnVector<Element>::Container & data = column->getData();
auto res_nested = ColumnVector<Result>::create(); typename ColVecResult::MutablePtr res_nested;
typename ColumnVector<Result>::Container & res_values = res_nested->getData(); if constexpr (IsDecimalNumber<Element>)
res_nested = ColVecResult::create(0, data.getScale());
else
res_nested = ColVecResult::create();
typename ColVecResult::Container & res_values = res_nested->getData();
res_values.resize(data.size()); res_values.resize(data.size());
size_t pos = 0; size_t pos = 0;
@ -110,7 +136,10 @@ struct ArrayCumSumImpl
executeType< Int32, Int64>(mapped, array, res) || executeType< Int32, Int64>(mapped, array, res) ||
executeType< Int64, Int64>(mapped, array, res) || executeType< Int64, Int64>(mapped, array, res) ||
executeType<Float32,Float64>(mapped, array, res) || executeType<Float32,Float64>(mapped, array, res) ||
executeType<Float64,Float64>(mapped, array, res)) executeType<Float64,Float64>(mapped, array, res) ||
executeType<Decimal32, Decimal128>(mapped, array, res) ||
executeType<Decimal64, Decimal128>(mapped, array, res) ||
executeType<Decimal128, Decimal128>(mapped, array, res))
return res; return res;
else else
throw Exception("Unexpected column for arrayCumSum: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN); throw Exception("Unexpected column for arrayCumSum: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN);

View File

@ -1,5 +1,7 @@
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include "FunctionArrayMapped.h" #include "FunctionArrayMapped.h"
#include <Functions/FunctionFactory.h> #include <Functions/FunctionFactory.h>
@ -34,6 +36,13 @@ struct ArrayCumSumNonNegativeImpl
if (which.isFloat()) if (which.isFloat())
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeFloat64>()); return std::make_shared<DataTypeArray>(std::make_shared<DataTypeFloat64>());
if (which.isDecimal())
{
UInt32 scale = getDecimalScale(*expression_return);
DataTypePtr nested = std::make_shared<DataTypeDecimal<Decimal128>>(maxDecimalPrecision<Decimal128>(), scale);
return std::make_shared<DataTypeArray>(nested);
}
throw Exception("arrayCumSumNonNegativeImpl cannot add values of type " + expression_return->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); throw Exception("arrayCumSumNonNegativeImpl cannot add values of type " + expression_return->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
} }
@ -41,16 +50,24 @@ struct ArrayCumSumNonNegativeImpl
template <typename Element, typename Result> template <typename Element, typename Result>
static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr) static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr)
{ {
const ColumnVector<Element> * column = checkAndGetColumn<ColumnVector<Element>>(&*mapped); using ColVecType = std::conditional_t<IsDecimalNumber<Element>, ColumnDecimal<Element>, ColumnVector<Element>>;
using ColVecResult = std::conditional_t<IsDecimalNumber<Result>, ColumnDecimal<Result>, ColumnVector<Result>>;
const ColVecType * column = checkAndGetColumn<ColVecType>(&*mapped);
if (!column) if (!column)
return false; return false;
const IColumn::Offsets & offsets = array.getOffsets(); const IColumn::Offsets & offsets = array.getOffsets();
const typename ColumnVector<Element>::Container & data = column->getData(); const typename ColVecType::Container & data = column->getData();
auto res_nested = ColumnVector<Result>::create(); typename ColVecResult::MutablePtr res_nested;
typename ColumnVector<Result>::Container & res_values = res_nested->getData(); if constexpr (IsDecimalNumber<Element>)
res_nested = ColVecResult::create(0, data.getScale());
else
res_nested = ColVecResult::create();
typename ColVecResult::Container & res_values = res_nested->getData();
res_values.resize(data.size()); res_values.resize(data.size());
size_t pos = 0; size_t pos = 0;
@ -60,7 +77,7 @@ struct ArrayCumSumNonNegativeImpl
// skip empty arrays // skip empty arrays
if (pos < offsets[i]) if (pos < offsets[i])
{ {
accum_sum = data[pos] > 0 ? data[pos] : 0; accum_sum = data[pos] > 0 ? data[pos] : Element(0);
res_values[pos] = accum_sum; res_values[pos] = accum_sum;
for (++pos; pos < offsets[i]; ++pos) for (++pos; pos < offsets[i]; ++pos)
{ {
@ -90,7 +107,10 @@ struct ArrayCumSumNonNegativeImpl
executeType< Int32, Int64>(mapped, array, res) || executeType< Int32, Int64>(mapped, array, res) ||
executeType< Int64, Int64>(mapped, array, res) || executeType< Int64, Int64>(mapped, array, res) ||
executeType<Float32,Float64>(mapped, array, res) || executeType<Float32,Float64>(mapped, array, res) ||
executeType<Float64,Float64>(mapped, array, res)) executeType<Float64,Float64>(mapped, array, res) ||
executeType<Decimal32, Decimal128>(mapped, array, res) ||
executeType<Decimal64, Decimal128>(mapped, array, res) ||
executeType<Decimal128, Decimal128>(mapped, array, res))
return res; return res;
else else
throw Exception("Unexpected column for arrayCumSumNonNegativeImpl: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN); throw Exception("Unexpected column for arrayCumSumNonNegativeImpl: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN);

View File

@ -1,5 +1,7 @@
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include "FunctionArrayMapped.h" #include "FunctionArrayMapped.h"
#include <Functions/FunctionFactory.h> #include <Functions/FunctionFactory.h>
@ -37,6 +39,9 @@ struct ArrayDifferenceImpl
if (which.isFloat32() || which.isFloat64()) if (which.isFloat32() || which.isFloat64())
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeFloat64>()); return std::make_shared<DataTypeArray>(std::make_shared<DataTypeFloat64>());
if (which.isDecimal())
return std::make_shared<DataTypeArray>(expression_return);
throw Exception("arrayDifference cannot process values of type " + expression_return->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); throw Exception("arrayDifference cannot process values of type " + expression_return->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
} }
@ -44,16 +49,24 @@ struct ArrayDifferenceImpl
template <typename Element, typename Result> template <typename Element, typename Result>
static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr) static bool executeType(const ColumnPtr & mapped, const ColumnArray & array, ColumnPtr & res_ptr)
{ {
const ColumnVector<Element> * column = checkAndGetColumn<ColumnVector<Element>>(&*mapped); using ColVecType = std::conditional_t<IsDecimalNumber<Element>, ColumnDecimal<Element>, ColumnVector<Element>>;
using ColVecResult = std::conditional_t<IsDecimalNumber<Result>, ColumnDecimal<Result>, ColumnVector<Result>>;
const ColVecType * column = checkAndGetColumn<ColVecType>(&*mapped);
if (!column) if (!column)
return false; return false;
const IColumn::Offsets & offsets = array.getOffsets(); const IColumn::Offsets & offsets = array.getOffsets();
const typename ColumnVector<Element>::Container & data = column->getData(); const typename ColVecType::Container & data = column->getData();
auto res_nested = ColumnVector<Result>::create(); typename ColVecResult::MutablePtr res_nested;
typename ColumnVector<Result>::Container & res_values = res_nested->getData(); if constexpr (IsDecimalNumber<Element>)
res_nested = ColVecResult::create(0, data.getScale());
else
res_nested = ColVecResult::create();
typename ColVecResult::Container & res_values = res_nested->getData();
res_values.resize(data.size()); res_values.resize(data.size());
size_t pos = 0; size_t pos = 0;
@ -87,7 +100,10 @@ struct ArrayDifferenceImpl
executeType< Int32, Int64>(mapped, array, res) || executeType< Int32, Int64>(mapped, array, res) ||
executeType< Int64, Int64>(mapped, array, res) || executeType< Int64, Int64>(mapped, array, res) ||
executeType<Float32,Float64>(mapped, array, res) || executeType<Float32,Float64>(mapped, array, res) ||
executeType<Float64,Float64>(mapped, array, res)) executeType<Float64,Float64>(mapped, array, res) ||
executeType<Decimal32, Decimal32>(mapped, array, res) ||
executeType<Decimal64, Decimal64>(mapped, array, res) ||
executeType<Decimal128, Decimal128>(mapped, array, res))
return res; return res;
else else
throw Exception("Unexpected column for arrayDifference: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN); throw Exception("Unexpected column for arrayDifference: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN);

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypeArray.h> #include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNothing.h> #include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeDate.h> #include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h> #include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeNullable.h> #include <DataTypes/DataTypeNullable.h>
@ -12,6 +13,7 @@
#include <Columns/ColumnArray.h> #include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h> #include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h> #include <Columns/ColumnFixedString.h>
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnNullable.h> #include <Columns/ColumnNullable.h>
#include <Columns/ColumnTuple.h> #include <Columns/ColumnTuple.h>
#include <Common/HashTable/ClearableHashMap.h> #include <Common/HashTable/ClearableHashMap.h>
@ -104,6 +106,19 @@ private:
template <typename T, size_t> template <typename T, size_t>
void operator()(); void operator()();
}; };
struct DecimalExecutor
{
const UnpackedArrays & arrays;
const DataTypePtr & data_type;
ColumnPtr & result;
DecimalExecutor(const UnpackedArrays & arrays_, const DataTypePtr & data_type_, ColumnPtr & result_)
: arrays(arrays_), data_type(data_type_), result(result_) {}
template <typename T, size_t>
void operator()();
};
}; };
@ -399,7 +414,8 @@ void FunctionArrayIntersect::executeImpl(Block & block, const ColumnNumbers & ar
ColumnPtr result_column; ColumnPtr result_column;
auto not_nullable_nested_return_type = removeNullable(nested_return_type); auto not_nullable_nested_return_type = removeNullable(nested_return_type);
TypeListNumbers::forEach(NumberExecutor(arrays, not_nullable_nested_return_type, result_column)); TypeListNativeNumbers::forEach(NumberExecutor(arrays, not_nullable_nested_return_type, result_column));
TypeListDecimalNumbers::forEach(DecimalExecutor(arrays, not_nullable_nested_return_type, result_column));
using DateMap = ClearableHashMap<DataTypeDate::FieldType, size_t, DefaultHash<DataTypeDate::FieldType>, using DateMap = ClearableHashMap<DataTypeDate::FieldType, size_t, DefaultHash<DataTypeDate::FieldType>,
HashTableGrower<INITIAL_SIZE_DEGREE>, HashTableGrower<INITIAL_SIZE_DEGREE>,
@ -445,6 +461,17 @@ void FunctionArrayIntersect::NumberExecutor::operator()()
result = execute<Map, ColumnVector<T>, true>(arrays, ColumnVector<T>::create()); result = execute<Map, ColumnVector<T>, true>(arrays, ColumnVector<T>::create());
} }
template <typename T, size_t>
void FunctionArrayIntersect::DecimalExecutor::operator()()
{
using Map = ClearableHashMap<T, size_t, DefaultHash<T>, HashTableGrower<INITIAL_SIZE_DEGREE>,
HashTableAllocatorWithStackMemory<(1ULL << INITIAL_SIZE_DEGREE) * sizeof(T)>>;
if (!result)
if (auto * decimal = typeid_cast<const DataTypeDecimal<T> *>(data_type.get()))
result = execute<Map, ColumnDecimal<T>, true>(arrays, ColumnDecimal<T>::create(0, decimal->getScale()));
}
template <typename Map, typename ColumnType, bool is_numeric_column> template <typename Map, typename ColumnType, bool is_numeric_column>
ColumnPtr FunctionArrayIntersect::execute(const UnpackedArrays & arrays, MutableColumnPtr result_data_ptr) ColumnPtr FunctionArrayIntersect::execute(const UnpackedArrays & arrays, MutableColumnPtr result_data_ptr)
{ {

View File

@ -37,10 +37,12 @@ struct ArraySplitImpl
size_t pos = 0; size_t pos = 0;
out_offsets_2.reserve(in_offsets.size()); // the actual size would be equal or larger out_offsets_2.reserve(in_offsets.size()); // assume the actual size to be equal or larger
out_offsets_1.reserve(in_offsets.size()); out_offsets_1.reserve(in_offsets.size());
for (size_t i = 0; i < in_offsets.size(); ++i) for (size_t i = 0; i < in_offsets.size(); ++i)
{
if (pos < in_offsets[i])
{ {
pos += !reverse; pos += !reverse;
for (; pos < in_offsets[i] - reverse; ++pos) for (; pos < in_offsets[i] - reverse; ++pos)
@ -51,6 +53,8 @@ struct ArraySplitImpl
pos += reverse; pos += reverse;
out_offsets_2.push_back(pos); out_offsets_2.push_back(pos);
}
out_offsets_1.push_back(out_offsets_2.size()); out_offsets_1.push_back(out_offsets_2.size());
} }
} }
@ -73,13 +77,21 @@ struct ArraySplitImpl
} }
else else
{ {
size_t pos = 0;
out_offsets_2.reserve(in_offsets.size()); out_offsets_2.reserve(in_offsets.size());
out_offsets_1.reserve(in_offsets.size()); out_offsets_1.reserve(in_offsets.size());
for (size_t i = 0; i < in_offsets.size(); ++i) for (size_t i = 0; i < in_offsets.size(); ++i)
{ {
out_offsets_2.push_back(in_offsets[i]); if (pos < in_offsets[i])
out_offsets_1.push_back(i + 1); {
pos = in_offsets[i];
out_offsets_2.push_back(pos);
}
out_offsets_1.push_back(out_offsets_2.size());
} }
} }
} }

View File

@ -1,5 +1,7 @@
#include <DataTypes/DataTypesNumber.h> #include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h> #include <Columns/ColumnsNumber.h>
#include <Columns/ColumnDecimal.h>
#include "FunctionArrayMapped.h" #include "FunctionArrayMapped.h"
#include <Functions/FunctionFactory.h> #include <Functions/FunctionFactory.h>
@ -31,25 +33,43 @@ struct ArraySumImpl
if (which.isFloat()) if (which.isFloat())
return std::make_shared<DataTypeFloat64>(); return std::make_shared<DataTypeFloat64>();
if (which.isDecimal())
{
UInt32 scale = getDecimalScale(*expression_return);
return std::make_shared<DataTypeDecimal<Decimal128>>(maxDecimalPrecision<Decimal128>(), scale);
}
throw Exception("arraySum cannot add values of type " + expression_return->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT); throw Exception("arraySum cannot add values of type " + expression_return->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
} }
template <typename Element, typename Result> template <typename Element, typename Result>
static bool executeType(const ColumnPtr & mapped, const ColumnArray::Offsets & offsets, ColumnPtr & res_ptr) static bool executeType(const ColumnPtr & mapped, const ColumnArray::Offsets & offsets, ColumnPtr & res_ptr)
{ {
const ColumnVector<Element> * column = checkAndGetColumn<ColumnVector<Element>>(&*mapped); using ColVecType = std::conditional_t<IsDecimalNumber<Element>, ColumnDecimal<Element>, ColumnVector<Element>>;
using ColVecResult = std::conditional_t<IsDecimalNumber<Result>, ColumnDecimal<Result>, ColumnVector<Result>>;
const ColVecType * column = checkAndGetColumn<ColVecType>(&*mapped);
if (!column) if (!column)
{ {
const ColumnConst * column_const = checkAndGetColumnConst<ColumnVector<Element>>(&*mapped); const ColumnConst * column_const = checkAndGetColumnConst<ColVecType>(&*mapped);
if (!column_const) if (!column_const)
return false; return false;
const Element x = column_const->template getValue<Element>(); const Element x = column_const->template getValue<Element>();
auto res_column = ColumnVector<Result>::create(offsets.size()); typename ColVecResult::MutablePtr res_column;
typename ColumnVector<Result>::Container & res = res_column->getData(); if constexpr (IsDecimalNumber<Element>)
{
const typename ColVecType::Container & data =
checkAndGetColumn<ColVecType>(&column_const->getDataColumn())->getData();
res_column = ColVecResult::create(offsets.size(), data.getScale());
}
else
res_column = ColVecResult::create(offsets.size());
typename ColVecResult::Container & res = res_column->getData();
size_t pos = 0; size_t pos = 0;
for (size_t i = 0; i < offsets.size(); ++i) for (size_t i = 0; i < offsets.size(); ++i)
@ -62,9 +82,15 @@ struct ArraySumImpl
return true; return true;
} }
const typename ColumnVector<Element>::Container & data = column->getData(); const typename ColVecType::Container & data = column->getData();
auto res_column = ColumnVector<Result>::create(offsets.size());
typename ColumnVector<Result>::Container & res = res_column->getData(); typename ColVecResult::MutablePtr res_column;
if constexpr (IsDecimalNumber<Element>)
res_column = ColVecResult::create(offsets.size(), data.getScale());
else
res_column = ColVecResult::create(offsets.size());
typename ColVecResult::Container & res = res_column->getData();
size_t pos = 0; size_t pos = 0;
for (size_t i = 0; i < offsets.size(); ++i) for (size_t i = 0; i < offsets.size(); ++i)
@ -95,7 +121,10 @@ struct ArraySumImpl
executeType< Int32, Int64>(mapped, offsets, res) || executeType< Int32, Int64>(mapped, offsets, res) ||
executeType< Int64, Int64>(mapped, offsets, res) || executeType< Int64, Int64>(mapped, offsets, res) ||
executeType<Float32,Float64>(mapped, offsets, res) || executeType<Float32,Float64>(mapped, offsets, res) ||
executeType<Float64,Float64>(mapped, offsets, res)) executeType<Float64,Float64>(mapped, offsets, res) ||
executeType<Decimal32, Decimal128>(mapped, offsets, res) ||
executeType<Decimal64, Decimal128>(mapped, offsets, res) ||
executeType<Decimal128, Decimal128>(mapped, offsets, res))
return res; return res;
else else
throw Exception("Unexpected column for arraySum: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN); throw Exception("Unexpected column for arraySum: " + mapped->getName(), ErrorCodes::ILLEGAL_COLUMN);

View File

@ -10,9 +10,6 @@
#include <math.h> #include <math.h>
#include <array> #include <array>
#define DEGREES_IN_RADIANS (M_PI / 180.0)
#define EARTH_RADIUS_IN_METERS 6372797.560856
namespace DB namespace DB
{ {
@ -24,19 +21,109 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
static inline Float64 degToRad(Float64 angle) { return angle * DEGREES_IN_RADIANS; } namespace
{
const double PI = 3.14159265358979323846;
const float TO_RADF = static_cast<float>(PI / 180.0);
const float TO_RADF2 = static_cast<float>(PI / 360.0);
const int GEODIST_TABLE_COS = 1024; // maxerr 0.00063%
const int GEODIST_TABLE_ASIN = 512;
const int GEODIST_TABLE_K = 1024;
float g_GeoCos[GEODIST_TABLE_COS + 1]; /// cos(x) table
float g_GeoAsin[GEODIST_TABLE_ASIN + 1]; /// asin(sqrt(x)) table
float g_GeoFlatK[GEODIST_TABLE_K + 1][2]; /// geodistAdaptive() flat ellipsoid method k1,k2 coeffs table
inline double sqr(double v)
{
return v * v;
}
inline float fsqr(float v)
{
return v * v;
}
void geodistInit()
{
for (size_t i = 0; i <= GEODIST_TABLE_COS; ++i)
g_GeoCos[i] = static_cast<float>(cos(2 * PI * i / GEODIST_TABLE_COS)); // [0, 2pi] -> [0, COSTABLE]
for (size_t i = 0; i <= GEODIST_TABLE_ASIN; ++i)
g_GeoAsin[i] = static_cast<float>(asin(
sqrt(static_cast<double>(i) / GEODIST_TABLE_ASIN))); // [0, 1] -> [0, ASINTABLE]
for (size_t i = 0; i <= GEODIST_TABLE_K; ++i)
{
double x = PI * i / GEODIST_TABLE_K - PI * 0.5; // [-pi/2, pi/2] -> [0, KTABLE]
g_GeoFlatK[i][0] = static_cast<float>(sqr(111132.09 - 566.05 * cos(2 * x) + 1.20 * cos(4 * x)));
g_GeoFlatK[i][1] = static_cast<float>(sqr(111415.13 * cos(x) - 94.55 * cos(3 * x) + 0.12 * cos(5 * x)));
}
}
inline float geodistDegDiff(float f)
{
f = static_cast<float>(fabs(f));
while (f > 360)
f -= 360;
if (f > 180)
f = 360 - f;
return f;
}
inline float geodistFastCos(float x)
{
float y = static_cast<float>(fabs(x) * GEODIST_TABLE_COS / PI / 2);
int i = static_cast<int>(y);
y -= i;
i &= (GEODIST_TABLE_COS - 1);
return g_GeoCos[i] + (g_GeoCos[i + 1] - g_GeoCos[i]) * y;
}
inline float geodistFastSin(float x)
{
float y = static_cast<float>(fabs(x) * GEODIST_TABLE_COS / PI / 2);
int i = static_cast<int>(y);
y -= i;
i = (i - GEODIST_TABLE_COS / 4) & (GEODIST_TABLE_COS - 1); // cos(x-pi/2)=sin(x), costable/4=pi/2
return g_GeoCos[i] + (g_GeoCos[i + 1] - g_GeoCos[i]) * y;
}
/// fast implementation of asin(sqrt(x))
/// max error in floats 0.00369%, in doubles 0.00072%
inline float geodistFastAsinSqrt(float x)
{
if (x < 0.122)
{
// distance under 4546km, Taylor error under 0.00072%
float y = static_cast<float>(sqrt(x));
return y + x * y * 0.166666666666666f + x * x * y * 0.075f + x * x * x * y * 0.044642857142857f;
}
if (x < 0.948)
{
// distance under 17083km, 512-entry LUT error under 0.00072%
x *= GEODIST_TABLE_ASIN;
int i = static_cast<int>(x);
return g_GeoAsin[i] + (g_GeoAsin[i + 1] - g_GeoAsin[i]) * (x - i);
}
return static_cast<float>(asin(sqrt(x))); // distance over 17083km, just compute honestly
}
}
/** /**
* The function calculates distance in meters between two points on Earth specified by longitude and latitude in degrees. * The function calculates distance in meters between two points on Earth specified by longitude and latitude in degrees.
* The function uses great circle distance formula https://en.wikipedia.org/wiki/Great-circle_distance . * The function uses great circle distance formula https://en.wikipedia.org/wiki/Great-circle_distance .
* Throws exception when one or several input values are not within reasonable bounds. * Throws exception when one or several input values are not within reasonable bounds.
* Latitude must be in [-90, 90], longitude must be [-180, 180] * Latitude must be in [-90, 90], longitude must be [-180, 180].
* * Original code of this implementation of this function is here https://github.com/sphinxsearch/sphinx/blob/409f2c2b5b2ff70b04e38f92b6b1a890326bad65/src/sphinxexpr.cpp#L3825.
* Andrey Aksenov, the author of original code, permitted to use this code in ClickHouse under the Apache 2.0 license.
* Presentation about this code from Highload++ Siberia 2019 is here https://github.com/ClickHouse/ClickHouse/files/3324740/1_._._GEODIST_._.pdf
* The main idea of this implementation is optimisations based on Taylor series, trigonometric identity and calculated constants once for cosine, arcsine(sqrt) and look up table.
*/ */
class FunctionGreatCircleDistance : public IFunction class FunctionGreatCircleDistance : public IFunction
{ {
public: public:
static constexpr auto name = "greatCircleDistance"; static constexpr auto name = "greatCircleDistance";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionGreatCircleDistance>(); } static FunctionPtr create(const Context &) { return std::make_shared<FunctionGreatCircleDistance>(); }
@ -103,16 +190,30 @@ private:
lat1Deg < -90 || lat1Deg > 90 || lat1Deg < -90 || lat1Deg > 90 ||
lat2Deg < -90 || lat2Deg > 90) lat2Deg < -90 || lat2Deg > 90)
{ {
throw Exception("Arguments values out of bounds for function " + getName(), ErrorCodes::ARGUMENT_OUT_OF_BOUND); throw Exception("Arguments values out of bounds for function " + getName(),
ErrorCodes::ARGUMENT_OUT_OF_BOUND);
} }
Float64 lon1Rad = degToRad(lon1Deg); float dlat = geodistDegDiff(lat1Deg - lat2Deg);
Float64 lat1Rad = degToRad(lat1Deg); float dlon = geodistDegDiff(lon1Deg - lon2Deg);
Float64 lon2Rad = degToRad(lon2Deg);
Float64 lat2Rad = degToRad(lat2Deg); if (dlon < 13)
Float64 u = sin((lat2Rad - lat1Rad) / 2); {
Float64 v = sin((lon2Rad - lon1Rad) / 2); // points are close enough; use flat ellipsoid model
return 2.0 * EARTH_RADIUS_IN_METERS * asin(sqrt(u * u + cos(lat1Rad) * cos(lat2Rad) * v * v)); // interpolate sqr(k1), sqr(k2) coefficients using latitudes midpoint
float m = (lat1Deg + lat2Deg + 180) * GEODIST_TABLE_K / 360; // [-90, 90] degrees -> [0, KTABLE] indexes
int i = static_cast<int>(m);
i &= (GEODIST_TABLE_K - 1);
float kk1 = g_GeoFlatK[i][0] + (g_GeoFlatK[i + 1][0] - g_GeoFlatK[i][0]) * (m - i);
float kk2 = g_GeoFlatK[i][1] + (g_GeoFlatK[i + 1][1] - g_GeoFlatK[i][1]) * (m - i);
return static_cast<float>(sqrt(kk1 * dlat * dlat + kk2 * dlon * dlon));
}
// points too far away; use haversine
static const float D = 2 * 6371000;
float a = fsqr(geodistFastSin(dlat * TO_RADF2)) +
geodistFastCos(lat1Deg * TO_RADF) * geodistFastCos(lat2Deg * TO_RADF) *
fsqr(geodistFastSin(dlon * TO_RADF2));
return static_cast<float>(D * geodistFastAsinSqrt(a));
} }
@ -160,6 +261,7 @@ private:
void registerFunctionGreatCircleDistance(FunctionFactory & factory) void registerFunctionGreatCircleDistance(FunctionFactory & factory)
{ {
geodistInit();
factory.registerFunction<FunctionGreatCircleDistance>(); factory.registerFunction<FunctionGreatCircleDistance>();
} }

View File

@ -175,9 +175,7 @@ public:
private: private:
template <typename T0, typename T1> template <typename T0, typename T1>
static constexpr bool allow_arrays = static constexpr bool allow_arrays = !std::is_same_v<T0, UInt128> && !std::is_same_v<T1, UInt128>;
!IsDecimalNumber<T0> && !IsDecimalNumber<T1> &&
!std::is_same_v<T0, UInt128> && !std::is_same_v<T1, UInt128>;
template <typename T0, typename T1> template <typename T0, typename T1>
static UInt32 decimalScale(Block & block [[maybe_unused]], const ColumnNumbers & arguments [[maybe_unused]]) static UInt32 decimalScale(Block & block [[maybe_unused]], const ColumnNumbers & arguments [[maybe_unused]])

View File

@ -140,6 +140,7 @@ struct ContextShared
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections. ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication. InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables. std::optional<BackgroundProcessingPool> background_pool; /// The thread pool for the background work performed by the tables.
std::optional<BackgroundProcessingPool> background_move_pool; /// The thread pool for the background moves performed by the tables.
std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables) std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
MultiVersion<Macros> macros; /// Substitutions extracted from config. MultiVersion<Macros> macros; /// Substitutions extracted from config.
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk. std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
@ -287,6 +288,7 @@ struct ContextShared
external_dictionaries_loader.reset(); external_dictionaries_loader.reset();
external_models_loader.reset(); external_models_loader.reset();
background_pool.reset(); background_pool.reset();
background_move_pool.reset();
schedule_pool.reset(); schedule_pool.reset();
ddl_worker.reset(); ddl_worker.reset();
@ -1489,6 +1491,14 @@ BackgroundProcessingPool & Context::getBackgroundPool()
return *shared->background_pool; return *shared->background_pool;
} }
BackgroundProcessingPool & Context::getBackgroundMovePool()
{
auto lock = getLock();
if (!shared->background_move_pool)
shared->background_move_pool.emplace(settings.background_move_pool_size, "BackgroundMovePool", "BgMoveProcPool");
return *shared->background_move_pool;
}
BackgroundSchedulePool & Context::getSchedulePool() BackgroundSchedulePool & Context::getSchedulePool()
{ {
auto lock = getLock(); auto lock = getLock();

View File

@ -450,6 +450,7 @@ public:
void dropCaches() const; void dropCaches() const;
BackgroundProcessingPool & getBackgroundPool(); BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & getBackgroundMovePool();
BackgroundSchedulePool & getSchedulePool(); BackgroundSchedulePool & getSchedulePool();
void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker); void setDDLWorker(std::unique_ptr<DDLWorker> ddl_worker);

View File

@ -233,9 +233,16 @@ void ExpressionAnalyzer::initGlobalSubqueriesAndExternalTables(bool do_global)
void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name) void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name)
{ {
auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name); auto set_key = PreparedSetKey::forSubquery(*subquery_or_table_name);
if (prepared_sets.count(set_key)) if (prepared_sets.count(set_key))
return; /// Already prepared. return; /// Already prepared.
if (auto set_ptr_from_storage_set = isPlainStorageSetInSubquery(subquery_or_table_name))
{
prepared_sets.insert({set_key, set_ptr_from_storage_set});
return;
}
auto interpreter_subquery = interpretSubquery(subquery_or_table_name, context, subquery_depth + 1, {}); auto interpreter_subquery = interpretSubquery(subquery_or_table_name, context, subquery_depth + 1, {});
BlockIO res = interpreter_subquery->execute(); BlockIO res = interpreter_subquery->execute();
@ -256,6 +263,19 @@ void SelectQueryExpressionAnalyzer::tryMakeSetForIndexFromSubquery(const ASTPtr
prepared_sets[set_key] = std::move(set); prepared_sets[set_key] = std::move(set);
} }
SetPtr SelectQueryExpressionAnalyzer::isPlainStorageSetInSubquery(const ASTPtr & subquery_or_table_name)
{
const auto * table = subquery_or_table_name->as<ASTIdentifier>();
if (!table)
return nullptr;
const DatabaseAndTableWithAlias database_table(*table);
const auto storage = context.getTable(database_table.database, database_table.table);
if (storage->getName() != "Set")
return nullptr;
const auto storage_set = std::dynamic_pointer_cast<StorageSet>(storage);
return storage_set->getSet();
}
/// Perfomance optimisation for IN() if storage supports it. /// Perfomance optimisation for IN() if storage supports it.
void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node) void SelectQueryExpressionAnalyzer::makeSetsForIndex(const ASTPtr & node)

View File

@ -219,6 +219,13 @@ private:
*/ */
void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name); void tryMakeSetForIndexFromSubquery(const ASTPtr & subquery_or_table_name);
/**
* Checks if subquery is not a plain StorageSet.
* Because while making set we will read data from StorageSet which is not allowed.
* Returns valid SetPtr from StorageSet if the latter is used after IN or nullptr otherwise.
*/
SetPtr isPlainStorageSetInSubquery(const ASTPtr & subquery_of_table_name);
JoinPtr makeTableJoin(const ASTTablesInSelectQueryElement & join_element); JoinPtr makeTableJoin(const ASTTablesInSelectQueryElement & join_element);
void makeSubqueryForJoin(const ASTTablesInSelectQueryElement & join_element, NamesWithAliases && required_columns_with_aliases, void makeSubqueryForJoin(const ASTTablesInSelectQueryElement & join_element, NamesWithAliases && required_columns_with_aliases,
SubqueryForSet & subquery_for_set) const; SubqueryForSet & subquery_for_set) const;

View File

@ -975,7 +975,7 @@ private:
/// do not update loadable objects with zero as lifetime /// do not update loadable objects with zero as lifetime
const auto & lifetime = loaded_object->getLifetime(); const auto & lifetime = loaded_object->getLifetime();
if (lifetime.min_sec == 0 || lifetime.max_sec == 0) if (lifetime.min_sec == 0 && lifetime.max_sec == 0)
return never; return never;
if (!error_count) if (!error_count)
@ -1197,6 +1197,12 @@ void ExternalLoader::reload(bool load_never_loading) const
loading_dispatcher->reload(load_never_loading); loading_dispatcher->reload(load_never_loading);
} }
void ExternalLoader::reload(const FilterByNameFunction & filter_by_name, bool load_never_loading) const
{
loading_dispatcher->setConfiguration(config_files_reader->read());
loading_dispatcher->reload(filter_by_name, load_never_loading);
}
void ExternalLoader::addObjectAndLoad( void ExternalLoader::addObjectAndLoad(
const String & name, const String & name,
const String & external_name, const String & external_name,

View File

@ -150,12 +150,16 @@ public:
/// Also function can load dictionary synchronously /// Also function can load dictionary synchronously
void reload(const String & name, bool load_never_loading = false) const; void reload(const String & name, bool load_never_loading = false) const;
/// Starts reloading of all the objects. /// Starts reloading of all the objects.
/// `load_never_loading` specifies what to do with the objects which have never been loading before. /// `load_never_loading` specifies what to do with the objects which have never been loading before.
/// The function can either skip them (false) or load for the first time (true). /// The function can either skip them (false) or load for the first time (true).
void reload(bool load_never_loading = false) const; void reload(bool load_never_loading = false) const;
/// Starts reloading of all objects matched `filter_by_name`.
/// `load_never_loading` specifies what to do with the objects which have never been loading before.
/// The function can either skip them (false) or load for the first time (true).
void reload(const FilterByNameFunction & filter_by_name, bool load_never_loading = false) const;
protected: protected:
virtual LoadablePtr create(const String & name, const Poco::Util::AbstractConfiguration & config, const String & key_in_config) const = 0; virtual LoadablePtr create(const String & name, const Poco::Util::AbstractConfiguration & config, const String & key_in_config) const = 0;

View File

@ -0,0 +1,30 @@
#include <Processors/Formats/IInputFormat.h>
#include <IO/ReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
IInputFormat::IInputFormat(Block header, ReadBuffer & in_)
: ISource(std::move(header)), in(in_)
{
}
void IInputFormat::resetParser()
{
if (in.hasPendingData())
throw Exception("Unread data in IInputFormat::resetParser. Most likely it's a bug.", ErrorCodes::LOGICAL_ERROR);
// those are protected attributes from ISource (I didn't want to propagate resetParser up there)
finished = false;
got_exception = false;
getPort().getInputPort().reopen();
}
}

View File

@ -23,10 +23,15 @@ protected:
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
public: public:
IInputFormat(Block header, ReadBuffer & in_) IInputFormat(Block header, ReadBuffer & in_);
: ISource(std::move(header)), in(in_)
{ /** In some usecase (hello Kafka) we need to read a lot of tiny streams in exactly the same format.
} * The recreating of parser for each small stream takes too long, so we introduce a method
* resetParser() which allow to reset the state of parser to continure reading of
* source stream w/o recreating that.
* That should be called after current buffer was fully read.
*/
virtual void resetParser();
virtual const BlockMissingValues & getMissingValues() const virtual const BlockMissingValues & getMissingValues() const
{ {

View File

@ -159,4 +159,13 @@ void IRowInputFormat::syncAfterError()
throw Exception("Method syncAfterError is not implemented for input format", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Method syncAfterError is not implemented for input format", ErrorCodes::NOT_IMPLEMENTED);
} }
void IRowInputFormat::resetParser()
{
IInputFormat::resetParser();
total_rows = 0;
num_errors = 0;
block_missing_values.clear();
}
} }

View File

@ -53,6 +53,8 @@ public:
Chunk generate() override; Chunk generate() override;
void resetParser() override;
protected: protected:
/** Read next row and append it to the columns. /** Read next row and append it to the columns.
* If no more rows - return false. * If no more rows - return false.

View File

@ -405,6 +405,14 @@ bool CSVRowInputFormat::readField(IColumn & column, const DataTypePtr & type, bo
} }
} }
void CSVRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
column_indexes_for_input_fields.clear();
read_columns.clear();
have_always_default_columns = false;
}
void registerInputFormatProcessorCSV(FormatFactory & factory) void registerInputFormatProcessorCSV(FormatFactory & factory)
{ {

View File

@ -28,6 +28,7 @@ public:
void readPrefix() override; void readPrefix() override;
bool allowSyncAfterError() const override { return true; } bool allowSyncAfterError() const override { return true; }
void syncAfterError() override; void syncAfterError() override;
void resetParser() override;
private: private:
bool with_names; bool with_names;

View File

@ -256,6 +256,15 @@ void JSONEachRowRowInputFormat::syncAfterError()
skipToUnescapedNextLineOrEOF(in); skipToUnescapedNextLineOrEOF(in);
} }
void JSONEachRowRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
nested_prefix_length = 0;
read_columns.clear();
seen_columns.clear();
prev_positions.clear();
}
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory) void registerInputFormatProcessorJSONEachRow(FormatFactory & factory)
{ {

View File

@ -27,6 +27,7 @@ public:
bool readRow(MutableColumns & columns, RowReadExtension & ext) override; bool readRow(MutableColumns & columns, RowReadExtension & ext) override;
bool allowSyncAfterError() const override { return true; } bool allowSyncAfterError() const override { return true; }
void syncAfterError() override; void syncAfterError() override;
void resetParser() override;
private: private:
const String & columnName(size_t i) const; const String & columnName(size_t i) const;

View File

@ -28,18 +28,16 @@ void MySQLOutputFormat::initialize()
initialized = true; initialized = true;
auto & header = getPort(PortKind::Main).getHeader(); auto & header = getPort(PortKind::Main).getHeader();
data_types = header.getDataTypes();
if (header.columns()) if (header.columns())
{ {
packet_sender.sendPacket(LengthEncodedNumber(header.columns())); packet_sender.sendPacket(LengthEncodedNumber(header.columns()));
for (const ColumnWithTypeAndName & column : header.getColumnsWithTypeAndName()) for (size_t i = 0; i < header.columns(); i++)
{ {
ColumnDefinition column_definition(column.name, CharacterSet::binary, 0, ColumnType::MYSQL_TYPE_STRING, const auto & column_name = header.getColumnsWithTypeAndName()[i].name;
0, 0); packet_sender.sendPacket(getColumnDefinition(column_name, data_types[i]->getTypeId()));
packet_sender.sendPacket(column_definition);
} }
if (!(context.mysql.client_capabilities & Capability::CLIENT_DEPRECATE_EOF)) if (!(context.mysql.client_capabilities & Capability::CLIENT_DEPRECATE_EOF))
@ -52,22 +50,9 @@ void MySQLOutputFormat::initialize()
void MySQLOutputFormat::consume(Chunk chunk) void MySQLOutputFormat::consume(Chunk chunk)
{ {
initialize(); for (size_t i = 0; i < chunk.getNumRows(); i++)
auto & header = getPort(PortKind::Main).getHeader();
size_t rows = chunk.getNumRows();
auto & columns = chunk.getColumns();
for (size_t i = 0; i < rows; i++)
{ {
ResultsetRow row_packet; ProtocolText::ResultsetRow row_packet(data_types, chunk.getColumns(), i);
for (size_t col = 0; col < columns.size(); ++col)
{
WriteBufferFromOwnString ostr;
header.getByPosition(col).type->serializeAsText(*columns[col], i, ostr, format_settings);
row_packet.appendColumn(std::move(ostr.str()));
}
packet_sender.sendPacket(row_packet); packet_sender.sendPacket(row_packet);
} }
} }

View File

@ -37,6 +37,7 @@ private:
const Context & context; const Context & context;
MySQLProtocol::PacketSender packet_sender; MySQLProtocol::PacketSender packet_sender;
FormatSettings format_settings; FormatSettings format_settings;
DataTypes data_types;
}; };
} }

View File

@ -20,6 +20,15 @@ public:
String getName() const override { return "NativeInputFormatFromNativeBlockInputStream"; } String getName() const override { return "NativeInputFormatFromNativeBlockInputStream"; }
protected: protected:
void resetParser() override
{
IInputFormat::resetParser();
stream->resetParser();
read_prefix = false;
read_suffix = false;
}
Chunk generate() override Chunk generate() override
{ {
/// TODO: do something with totals and extremes. /// TODO: do something with totals and extremes.

View File

@ -62,6 +62,16 @@ namespace DB
return res; return res;
} }
void ORCBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
file_reader.reset();
file_data.clear();
row_group_total = 0;
row_group_current = 0;
}
void registerInputFormatProcessorORC(FormatFactory &factory) void registerInputFormatProcessorORC(FormatFactory &factory)
{ {
factory.registerInputFormatProcessor( factory.registerInputFormatProcessor(

View File

@ -21,6 +21,8 @@ public:
String getName() const override { return "ORCBlockInputFormat"; } String getName() const override { return "ORCBlockInputFormat"; }
void resetParser() override;
protected: protected:
Chunk generate() override; Chunk generate() override;

View File

@ -63,6 +63,17 @@ namespace DB
return res; return res;
} }
void ParquetBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
file_reader.reset();
file_data.clear();
buffer.reset();
row_group_total = 0;
row_group_current = 0;
}
void registerInputFormatProcessorParquet(FormatFactory &factory) void registerInputFormatProcessorParquet(FormatFactory &factory)
{ {
factory.registerInputFormatProcessor( factory.registerInputFormatProcessor(

View File

@ -18,6 +18,9 @@ class ParquetBlockInputFormat: public IInputFormat
public: public:
ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const Context & context_); ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const Context & context_);
void resetParser() override;
String getName() const override { return "ParquetBlockInputFormat"; } String getName() const override { return "ParquetBlockInputFormat"; }
protected: protected:

View File

@ -65,7 +65,6 @@ void ProtobufRowInputFormat::syncAfterError()
reader.endMessage(true); reader.endMessage(true);
} }
void registerInputFormatProcessorProtobuf(FormatFactory & factory) void registerInputFormatProcessorProtobuf(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("Protobuf", []( factory.registerInputFormatProcessor("Protobuf", [](

View File

@ -197,6 +197,14 @@ void TSKVRowInputFormat::syncAfterError()
} }
void TSKVRowInputFormat::resetParser()
{
IRowInputFormat::resetParser();
read_columns.clear();
seen_columns.clear();
name_buf.clear();
}
void registerInputFormatProcessorTSKV(FormatFactory & factory) void registerInputFormatProcessorTSKV(FormatFactory & factory)
{ {
factory.registerInputFormatProcessor("TSKV", []( factory.registerInputFormatProcessor("TSKV", [](

View File

@ -30,6 +30,8 @@ public:
bool readRow(MutableColumns & columns, RowReadExtension &) override; bool readRow(MutableColumns & columns, RowReadExtension &) override;
bool allowSyncAfterError() const override { return true; } bool allowSyncAfterError() const override { return true; }
void syncAfterError() override; void syncAfterError() override;
void resetParser() override;
private: private:
const FormatSettings format_settings; const FormatSettings format_settings;

View File

@ -341,6 +341,13 @@ void TabSeparatedRowInputFormat::syncAfterError()
skipToUnescapedNextLineOrEOF(in); skipToUnescapedNextLineOrEOF(in);
} }
void TabSeparatedRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
column_indexes_for_input_fields.clear();
read_columns.clear();
columns_to_fill_with_default_values.clear();
}
void registerInputFormatProcessorTabSeparated(FormatFactory & factory) void registerInputFormatProcessorTabSeparated(FormatFactory & factory)
{ {

View File

@ -26,6 +26,8 @@ public:
bool allowSyncAfterError() const override { return true; } bool allowSyncAfterError() const override { return true; }
void syncAfterError() override; void syncAfterError() override;
void resetParser() override;
private: private:
bool with_names; bool with_names;
bool with_types; bool with_types;

View File

@ -496,6 +496,11 @@ void TemplateRowInputFormat::throwUnexpectedEof()
ErrorCodes::CANNOT_READ_ALL_DATA); ErrorCodes::CANNOT_READ_ALL_DATA);
} }
void TemplateRowInputFormat::resetParser()
{
RowInputFormatWithDiagnosticInfo::resetParser();
end_of_stream = false;
}
void registerInputFormatProcessorTemplate(FormatFactory & factory) void registerInputFormatProcessorTemplate(FormatFactory & factory)
{ {

View File

@ -28,6 +28,8 @@ public:
bool allowSyncAfterError() const override; bool allowSyncAfterError() const override;
void syncAfterError() override; void syncAfterError() override;
void resetParser() override;
private: private:
bool deserializeField(const DataTypePtr & type, IColumn & column, size_t file_column); bool deserializeField(const DataTypePtr & type, IColumn & column, size_t file_column);
void skipField(ColumnFormat col_format); void skipField(ColumnFormat col_format);

View File

@ -411,6 +411,13 @@ void ValuesBlockInputFormat::readSuffix()
throw Exception("Unread data in PeekableReadBuffer will be lost. Most likely it's a bug.", ErrorCodes::LOGICAL_ERROR); throw Exception("Unread data in PeekableReadBuffer will be lost. Most likely it's a bug.", ErrorCodes::LOGICAL_ERROR);
} }
void ValuesBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
// I'm not resetting parser modes here.
// There is a good chance that all messages have the same format.
total_rows = 0;
}
void registerInputFormatProcessorValues(FormatFactory & factory) void registerInputFormatProcessorValues(FormatFactory & factory)
{ {

View File

@ -33,6 +33,8 @@ public:
String getName() const override { return "ValuesBlockInputFormat"; } String getName() const override { return "ValuesBlockInputFormat"; }
void resetParser() override;
const BlockMissingValues & getMissingValues() const override { return block_missing_values; } const BlockMissingValues & getMissingValues() const override { return block_missing_values; }
private: private:

View File

@ -164,4 +164,17 @@ String RowInputFormatWithDiagnosticInfo::alignedName(const String & name, size_t
return name + ", " + std::string(spaces_count, ' '); return name + ", " + std::string(spaces_count, ' ');
} }
void RowInputFormatWithDiagnosticInfo::resetParser()
{
IRowInputFormat::resetParser();
row_num = 0;
bytes_read_at_start_of_buffer_on_current_row = 0;
bytes_read_at_start_of_buffer_on_prev_row = 0;
offset_of_current_row = std::numeric_limits<size_t>::max();
offset_of_prev_row = std::numeric_limits<size_t>::max();
max_length_of_column_name = 0;
max_length_of_data_type_name = 0;
}
} }

View File

@ -16,6 +16,8 @@ public:
String getDiagnosticInfo() override; String getDiagnosticInfo() override;
void resetParser() override;
protected: protected:
void updateDiagnosticInfo(); void updateDiagnosticInfo();
bool deserializeFieldAndPrintDiagnosticInfo(const String & col_name, const DataTypePtr & type, IColumn & column, bool deserializeFieldAndPrintDiagnosticInfo(const String & col_name, const DataTypePtr & type, IColumn & column,

View File

@ -316,6 +316,17 @@ public:
is_finished = true; is_finished = true;
} }
void ALWAYS_INLINE reopen()
{
assumeConnected();
if (!isFinished())
return;
state->setFlags(0, State::IS_FINISHED);
is_finished = false;
}
OutputPort & getOutputPort() OutputPort & getOutputPort()
{ {
assumeConnected(); assumeConnected();

View File

@ -4,6 +4,8 @@
#include <DataStreams/OneBlockInputStream.h> #include <DataStreams/OneBlockInputStream.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <Storages/Kafka/ReadBufferFromKafkaConsumer.h> #include <Storages/Kafka/ReadBufferFromKafkaConsumer.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
namespace DB namespace DB
{ {
@ -16,6 +18,7 @@ KafkaBlockInputStream::KafkaBlockInputStream(
, commit_in_suffix(commit_in_suffix_) , commit_in_suffix(commit_in_suffix_)
, non_virtual_header(storage.getSampleBlockNonMaterialized()) /// FIXME: add materialized columns support , non_virtual_header(storage.getSampleBlockNonMaterialized()) /// FIXME: add materialized columns support
, virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"})) , virtual_header(storage.getSampleBlockForColumns({"_topic", "_key", "_offset", "_partition", "_timestamp"}))
{ {
context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV) context.setSetting("input_format_skip_unknown_fields", 1u); // Always skip unknown fields regardless of the context (JSON or TSKV)
context.setSetting("input_format_allow_errors_ratio", 0.); context.setSetting("input_format_allow_errors_ratio", 0.);
@ -23,8 +26,6 @@ KafkaBlockInputStream::KafkaBlockInputStream(
if (!storage.getSchemaName().empty()) if (!storage.getSchemaName().empty())
context.setSetting("format_schema", storage.getSchemaName()); context.setSetting("format_schema", storage.getSchemaName());
virtual_columns = virtual_header.cloneEmptyColumns();
} }
KafkaBlockInputStream::~KafkaBlockInputStream() KafkaBlockInputStream::~KafkaBlockInputStream()
@ -62,7 +63,10 @@ Block KafkaBlockInputStream::readImpl()
if (!buffer) if (!buffer)
return Block(); return Block();
auto read_callback = [this] MutableColumns result_columns = non_virtual_header.cloneEmptyColumns();
MutableColumns virtual_columns = virtual_header.cloneEmptyColumns();
auto read_callback = [&]
{ {
virtual_columns[0]->insert(buffer->currentTopic()); // "topic" virtual_columns[0]->insert(buffer->currentTopic()); // "topic"
virtual_columns[1]->insert(buffer->currentKey()); // "key" virtual_columns[1]->insert(buffer->currentKey()); // "key"
@ -74,69 +78,74 @@ Block KafkaBlockInputStream::readImpl()
virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp" virtual_columns[4]->insert(std::chrono::duration_cast<std::chrono::seconds>(timestamp->get_timestamp()).count()); // "timestamp"
}; };
auto merge_blocks = [] (Block & block1, Block && block2) auto input_format = FormatFactory::instance().getInputFormat(
{
if (!block1)
{
// Need to make sure that resulting block has the same structure
block1 = std::move(block2);
return;
}
if (!block2)
return;
auto columns1 = block1.mutateColumns();
auto columns2 = block2.mutateColumns();
for (size_t i = 0, s = columns1.size(); i < s; ++i)
columns1[i]->insertRangeFrom(*columns2[i], 0, columns2[i]->size());
block1.setColumns(std::move(columns1));
};
auto read_kafka_message = [&, this]
{
Block result;
auto child = FormatFactory::instance().getInput(
storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback); storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, read_callback);
while (auto block = child->read()) InputPort port(input_format->getPort().getHeader(), input_format.get());
{ connect(input_format->getPort(), port);
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns)); port.setNeeded();
virtual_columns = virtual_header.cloneEmptyColumns();
for (const auto & column : virtual_block.getColumnsWithTypeAndName()) auto read_kafka_message = [&]
block.insert(column); {
size_t new_rows = 0;
while (true)
{
auto status = input_format->prepare();
switch (status)
{
case IProcessor::Status::Ready:
input_format->work();
break;
case IProcessor::Status::Finished:
input_format->resetParser();
return new_rows;
case IProcessor::Status::PortFull:
{
auto chunk = port.pull();
new_rows = new_rows + chunk.getNumRows();
/// FIXME: materialize MATERIALIZED columns here. /// FIXME: materialize MATERIALIZED columns here.
merge_blocks(result, std::move(block)); auto columns = chunk.detachColumns();
for (size_t i = 0, s = columns.size(); i < s; ++i)
result_columns[i]->insertRangeFrom(*columns[i], 0, columns[i]->size());
break;
}
case IProcessor::Status::NeedData:
case IProcessor::Status::Async:
case IProcessor::Status::Wait:
case IProcessor::Status::ExpandPipeline:
throw Exception("Source processor returned status " + IProcessor::statusToName(status), ErrorCodes::LOGICAL_ERROR);
}
} }
return result;
}; };
Block single_block; size_t total_rows = 0;
UInt64 total_rows = 0;
while (total_rows < max_block_size) while (total_rows < max_block_size)
{ {
auto new_block = read_kafka_message(); auto new_rows = read_kafka_message();
auto new_rows = new_block.rows(); total_rows = total_rows + new_rows;
total_rows += new_rows;
merge_blocks(single_block, std::move(new_block));
buffer->allowNext(); buffer->allowNext();
if (!new_rows || !checkTimeLimit()) if (!new_rows || !checkTimeLimit())
break; break;
} }
if (!single_block) if (total_rows == 0)
return Block(); return Block();
auto result_block = non_virtual_header.cloneWithColumns(std::move(result_columns));
auto virtual_block = virtual_header.cloneWithColumns(std::move(virtual_columns));
for (const auto & column : virtual_block.getColumnsWithTypeAndName())
result_block.insert(column);
return ConvertingBlockInputStream( return ConvertingBlockInputStream(
context, context,
std::make_shared<OneBlockInputStream>(single_block), std::make_shared<OneBlockInputStream>(result_block),
getHeader(), getHeader(),
ConvertingBlockInputStream::MatchColumnsMode::Name) ConvertingBlockInputStream::MatchColumnsMode::Name)
.read(); .read();

View File

@ -33,9 +33,7 @@ private:
UInt64 max_block_size; UInt64 max_block_size;
ConsumerBufferPtr buffer; ConsumerBufferPtr buffer;
MutableColumns virtual_columns;
bool broken = true, claimed = false, commit_in_suffix; bool broken = true, claimed = false, commit_in_suffix;
const Block non_virtual_header, virtual_header; const Block non_virtual_header, virtual_header;
}; };

View File

@ -61,9 +61,12 @@ void BackgroundProcessingPoolTaskInfo::wake()
} }
BackgroundProcessingPool::BackgroundProcessingPool(int size_) : size(size_) BackgroundProcessingPool::BackgroundProcessingPool(int size_, const char * log_name, const char * thread_name_)
: size(size_)
, thread_name(thread_name_)
{ {
LOG_INFO(&Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with " << size << " threads"); logger = &Logger::get(log_name);
LOG_INFO(logger, "Create " << log_name << " with " << size << " threads");
threads.resize(size); threads.resize(size);
for (auto & thread : threads) for (auto & thread : threads)
@ -122,7 +125,7 @@ BackgroundProcessingPool::~BackgroundProcessingPool()
void BackgroundProcessingPool::threadFunction() void BackgroundProcessingPool::threadFunction()
{ {
setThreadName("BackgrProcPool"); setThreadName(thread_name);
{ {
std::lock_guard lock(tasks_mutex); std::lock_guard lock(tasks_mutex);

View File

@ -46,7 +46,9 @@ public:
using TaskHandle = std::shared_ptr<TaskInfo>; using TaskHandle = std::shared_ptr<TaskInfo>;
BackgroundProcessingPool(int size_); BackgroundProcessingPool(int size_,
const char * log_name = "BackgroundProcessingPool",
const char * thread_name_ = "BackgrProcPool");
size_t getNumberOfThreads() const size_t getNumberOfThreads() const
{ {
@ -67,6 +69,8 @@ protected:
using Threads = std::vector<ThreadFromGlobalPool>; using Threads = std::vector<ThreadFromGlobalPool>;
const size_t size; const size_t size;
const char * thread_name;
Poco::Logger * logger;
Tasks tasks; /// Ordered in priority. Tasks tasks; /// Ordered in priority.
std::mutex tasks_mutex; std::mutex tasks_mutex;

View File

@ -3473,6 +3473,11 @@ bool MergeTreeData::selectPartsAndMove()
return moveParts(std::move(moving_tagger)); return moveParts(std::move(moving_tagger));
} }
bool MergeTreeData::areBackgroundMovesNeeded() const
{
return storage_policy->getVolumes().size() > 1;
}
bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, DiskSpace::SpacePtr space) bool MergeTreeData::movePartsToSpace(const DataPartsVector & parts, DiskSpace::SpacePtr space)
{ {
if (parts_mover.moves_blocker.isCancelled()) if (parts_mover.moves_blocker.isCancelled())

View File

@ -939,6 +939,8 @@ protected:
/// Selects parts for move and moves them, used in background process /// Selects parts for move and moves them, used in background process
bool selectPartsAndMove(); bool selectPartsAndMove();
bool areBackgroundMovesNeeded() const;
private: private:
/// RAII Wrapper for atomic work with currently moving parts /// RAII Wrapper for atomic work with currently moving parts
/// Acuire them in constructor and remove them in destructor /// Acuire them in constructor and remove them in destructor

View File

@ -214,7 +214,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
} }
catch (const Coordination::Exception & e) catch (const Coordination::Exception & e)
{ {
LOG_ERROR(log, "Couldn't start replication: " << e.what() << ", " << e.displayText() << ", stack trace:\n" << e.getStackTrace().toString()); LOG_ERROR(log, "Couldn't start replication: " << e.what() << ". " << DB::getCurrentExceptionMessage(true));
return false; return false;
} }
catch (const Exception & e) catch (const Exception & e)
@ -222,7 +222,7 @@ bool ReplicatedMergeTreeRestartingThread::tryStartup()
if (e.code() != ErrorCodes::REPLICA_IS_ALREADY_ACTIVE) if (e.code() != ErrorCodes::REPLICA_IS_ALREADY_ACTIVE)
throw; throw;
LOG_ERROR(log, "Couldn't start replication: " << e.what() << ", " << e.displayText() << ", stack trace:\n" << e.getStackTrace().toString()); LOG_ERROR(log, "Couldn't start replication: " << e.what() << ". " << DB::getCurrentExceptionMessage(true));
return false; return false;
} }
} }

View File

@ -217,12 +217,11 @@ MergeTreeData::DataPart::Checksums checkDataPart(
MergeTreeData::DataPart::Checksums checksums_data; MergeTreeData::DataPart::Checksums checksums_data;
size_t marks_in_primary_key = 0; size_t marks_in_primary_key = 0;
if (!primary_key_data_types.empty())
{ {
ReadBufferFromFile file_buf(path + "primary.idx"); ReadBufferFromFile file_buf(path + "primary.idx");
HashingReadBuffer hashing_buf(file_buf); HashingReadBuffer hashing_buf(file_buf);
if (!primary_key_data_types.empty())
{
size_t key_size = primary_key_data_types.size(); size_t key_size = primary_key_data_types.size();
MutableColumns tmp_columns(key_size); MutableColumns tmp_columns(key_size);
@ -238,11 +237,6 @@ MergeTreeData::DataPart::Checksums checkDataPart(
for (size_t j = 0; j < key_size; ++j) for (size_t j = 0; j < key_size; ++j)
primary_key_data_types[j]->deserializeBinary(*tmp_columns[j].get(), hashing_buf); primary_key_data_types[j]->deserializeBinary(*tmp_columns[j].get(), hashing_buf);
} }
}
else
{
hashing_buf.tryIgnore(std::numeric_limits<size_t>::max());
}
size_t primary_idx_size = hashing_buf.count(); size_t primary_idx_size = hashing_buf.count();

View File

@ -15,7 +15,7 @@ namespace DB
MergeTreeData::DataPart::Checksums checkDataPart( MergeTreeData::DataPart::Checksums checkDataPart(
MergeTreeData::DataPartPtr data_part, MergeTreeData::DataPartPtr data_part,
bool require_checksums, bool require_checksums,
const DataTypes & primary_key_data_types, /// Check the primary key. If it is not necessary, pass an empty array. const DataTypes & primary_key_data_types,
const MergeTreeIndices & indices = {}, /// Check skip indices const MergeTreeIndices & indices = {}, /// Check skip indices
std::function<bool()> is_cancelled = []{ return false; }); std::function<bool()> is_cancelled = []{ return false; });
@ -24,7 +24,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
const MergeTreeIndexGranularity & index_granularity, const MergeTreeIndexGranularity & index_granularity,
const String & marks_file_extension, const String & marks_file_extension,
bool require_checksums, bool require_checksums,
const DataTypes & primary_key_data_types, /// Check the primary key. If it is not necessary, pass an empty array. const DataTypes & primary_key_data_types,
const MergeTreeIndices & indices = {}, /// Check skip indices const MergeTreeIndices & indices = {}, /// Check skip indices
std::function<bool()> is_cancelled = []{ return false; }); std::function<bool()> is_cancelled = []{ return false; });
} }

View File

@ -16,6 +16,7 @@
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h> #include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <DataStreams/narrowBlockInputStreams.h>
#include <Common/escapeForFileName.h> #include <Common/escapeForFileName.h>
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
@ -254,7 +255,7 @@ BlockInputStreams StorageFile::read(
const Context & context, const Context & context,
QueryProcessingStage::Enum /*processed_stage*/, QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size, size_t max_block_size,
unsigned /*num_streams*/) unsigned num_streams)
{ {
const ColumnsDescription & columns_ = getColumns(); const ColumnsDescription & columns_ = getColumns();
auto column_defaults = columns_.getDefaults(); auto column_defaults = columns_.getDefaults();
@ -268,7 +269,7 @@ BlockInputStreams StorageFile::read(
std::static_pointer_cast<StorageFile>(shared_from_this()), context, max_block_size, file_path, IStorage::chooseCompressionMethod(file_path, compression_method)); std::static_pointer_cast<StorageFile>(shared_from_this()), context, max_block_size, file_path, IStorage::chooseCompressionMethod(file_path, compression_method));
blocks_input.push_back(column_defaults.empty() ? cur_block : std::make_shared<AddingDefaultsBlockInputStream>(cur_block, column_defaults, context)); blocks_input.push_back(column_defaults.empty() ? cur_block : std::make_shared<AddingDefaultsBlockInputStream>(cur_block, column_defaults, context));
} }
return blocks_input; return narrowBlockInputStreams(blocks_input, num_streams);
} }

View File

@ -17,6 +17,8 @@
#include <DataStreams/UnionBlockInputStream.h> #include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h> #include <DataStreams/OwningBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h> #include <DataStreams/IBlockInputStream.h>
#include <DataStreams/narrowBlockInputStreams.h>
#include <Common/parseGlobs.h> #include <Common/parseGlobs.h>
#include <Poco/URI.h> #include <Poco/URI.h>
#include <re2/re2.h> #include <re2/re2.h>
@ -196,7 +198,7 @@ BlockInputStreams StorageHDFS::read(
const Context & context_, const Context & context_,
QueryProcessingStage::Enum /*processed_stage*/, QueryProcessingStage::Enum /*processed_stage*/,
size_t max_block_size, size_t max_block_size,
unsigned /*num_streams*/) unsigned num_streams)
{ {
const size_t begin_of_path = uri.find('/', uri.find("//") + 2); const size_t begin_of_path = uri.find('/', uri.find("//") + 2);
const String path_from_uri = uri.substr(begin_of_path); const String path_from_uri = uri.substr(begin_of_path);
@ -213,7 +215,7 @@ BlockInputStreams StorageHDFS::read(
max_block_size, IStorage::chooseCompressionMethod(res_path, compression_method))); max_block_size, IStorage::chooseCompressionMethod(res_path, compression_method)));
} }
return result; return narrowBlockInputStreams(result, num_streams);
} }
void StorageHDFS::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) void StorageHDFS::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &)

View File

@ -99,7 +99,8 @@ void StorageMergeTree::startup()
/// NOTE background task will also do the above cleanups periodically. /// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup.restart(); time_after_previous_cleanup.restart();
merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); }); merging_mutating_task_handle = global_context.getBackgroundPool().addTask([this] { return mergeMutateTask(); });
moving_task_handle = global_context.getBackgroundPool().addTask([this] { return movePartsTask(); }); if (areBackgroundMovesNeeded())
moving_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); });
} }
@ -115,7 +116,7 @@ void StorageMergeTree::shutdown()
global_context.getBackgroundPool().removeTask(merging_mutating_task_handle); global_context.getBackgroundPool().removeTask(merging_mutating_task_handle);
if (moving_task_handle) if (moving_task_handle)
global_context.getBackgroundPool().removeTask(moving_task_handle); global_context.getBackgroundMovePool().removeTask(moving_task_handle);
} }

View File

@ -2878,7 +2878,8 @@ void StorageReplicatedMergeTree::startup()
data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler()); data_parts_exchange_endpoint->getId(replica_path), data_parts_exchange_endpoint, global_context.getInterserverIOHandler());
queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); }); queue_task_handle = global_context.getBackgroundPool().addTask([this] { return queueTask(); });
move_parts_task_handle = global_context.getBackgroundPool().addTask([this] { return movePartsTask(); }); if (areBackgroundMovesNeeded())
move_parts_task_handle = global_context.getBackgroundMovePool().addTask([this] { return movePartsTask(); });
/// In this thread replica will be activated. /// In this thread replica will be activated.
restarting_thread.start(); restarting_thread.start();
@ -2902,7 +2903,7 @@ void StorageReplicatedMergeTree::shutdown()
queue_task_handle.reset(); queue_task_handle.reset();
if (move_parts_task_handle) if (move_parts_task_handle)
global_context.getBackgroundPool().removeTask(move_parts_task_handle); global_context.getBackgroundMovePool().removeTask(move_parts_task_handle);
move_parts_task_handle.reset(); move_parts_task_handle.reset();
if (data_parts_exchange_endpoint_holder) if (data_parts_exchange_endpoint_holder)

View File

@ -36,10 +36,12 @@ NamesAndTypesList StorageSystemDictionaries::getNamesAndTypes()
{"element_count", std::make_shared<DataTypeUInt64>()}, {"element_count", std::make_shared<DataTypeUInt64>()},
{"load_factor", std::make_shared<DataTypeFloat64>()}, {"load_factor", std::make_shared<DataTypeFloat64>()},
{"source", std::make_shared<DataTypeString>()}, {"source", std::make_shared<DataTypeString>()},
{"lifetime_min", std::make_shared<DataTypeUInt64>()},
{"lifetime_max", std::make_shared<DataTypeUInt64>()},
{"loading_start_time", std::make_shared<DataTypeDateTime>()}, {"loading_start_time", std::make_shared<DataTypeDateTime>()},
{"loading_duration", std::make_shared<DataTypeFloat32>()}, {"loading_duration", std::make_shared<DataTypeFloat32>()},
//{ "creation_time", std::make_shared<DataTypeDateTime>() }, //{ "creation_time", std::make_shared<DataTypeDateTime>() },
{"last_exception", std::make_shared<DataTypeString>()}, {"last_exception", std::make_shared<DataTypeString>()}
}; };
} }
@ -77,12 +79,15 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
res_columns[i++]->insert(dict_ptr->getLoadFactor()); res_columns[i++]->insert(dict_ptr->getLoadFactor());
res_columns[i++]->insert(dict_ptr->getSource()->toString()); res_columns[i++]->insert(dict_ptr->getSource()->toString());
const auto & lifetime = dict_ptr->getLifetime();
res_columns[i++]->insert(lifetime.min_sec);
res_columns[i++]->insert(lifetime.max_sec);
if (!last_exception) if (!last_exception)
last_exception = dict_ptr->getLastException(); last_exception = dict_ptr->getLastException();
} }
else else
{ {
for (size_t j = 0; j != 10; ++j) for (size_t j = 0; j != 12; ++j) // Number of empty fields if dict_ptr is null
res_columns[i++]->insertDefault(); res_columns[i++]->insertDefault();
} }
@ -93,7 +98,9 @@ void StorageSystemDictionaries::fillData(MutableColumns & res_columns, const Con
res_columns[i++]->insert(getExceptionMessage(last_exception, false)); res_columns[i++]->insert(getExceptionMessage(last_exception, false));
else else
res_columns[i++]->insertDefault(); res_columns[i++]->insertDefault();
} }
} }
} }

View File

@ -4,9 +4,6 @@ target_link_libraries (system_numbers PRIVATE dbms clickhouse_storages_system cl
add_executable (storage_log storage_log.cpp) add_executable (storage_log storage_log.cpp)
target_link_libraries (storage_log PRIVATE dbms) target_link_libraries (storage_log PRIVATE dbms)
add_executable (part_checker part_checker.cpp)
target_link_libraries (part_checker PRIVATE dbms)
add_executable (part_name part_name.cpp) add_executable (part_name part_name.cpp)
target_link_libraries (part_name PRIVATE dbms) target_link_libraries (part_name PRIVATE dbms)

View File

@ -1,80 +0,0 @@
#include <Poco/ConsoleChannel.h>
#include <Poco/DirectoryIterator.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Common/Exception.h>
using namespace DB;
Poco::Path getMarksFile(const std::string & part_path)
{
Poco::DirectoryIterator it(part_path);
Poco::DirectoryIterator end;
while (it != end)
{
Poco::Path p(it.path());
auto extension = p.getExtension();
if (extension == "mrk2" || extension == "mrk")
return p;
++it;
}
throw Exception("Cannot find any mark file in directory " + part_path, DB::ErrorCodes::METRIKA_OTHER_ERROR);
}
MergeTreeIndexGranularity readGranularity(const Poco::Path & mrk_file_path, size_t fixed_granularity)
{
MergeTreeIndexGranularity result;
auto extension = mrk_file_path.getExtension();
DB::ReadBufferFromFile mrk_in(mrk_file_path.toString());
for (size_t mark_num = 0; !mrk_in.eof(); ++mark_num)
{
UInt64 offset_in_compressed_file = 0;
UInt64 offset_in_decompressed_block = 0;
DB::readBinary(offset_in_compressed_file, mrk_in);
DB::readBinary(offset_in_decompressed_block, mrk_in);
UInt64 index_granularity_rows = 0;
if (extension == "mrk2")
DB::readBinary(index_granularity_rows, mrk_in);
else
index_granularity_rows = fixed_granularity;
result.appendMark(index_granularity_rows);
}
return result;
}
int main(int argc, char ** argv)
{
Poco::AutoPtr<Poco::ConsoleChannel> channel = new Poco::ConsoleChannel(std::cerr);
Logger::root().setChannel(channel);
Logger::root().setLevel("trace");
if (argc != 4)
{
std::cerr << "Usage: " << argv[0] << " path strict index_granularity" << std::endl;
return 1;
}
try
{
std::string full_path{argv[1]};
auto mrk_file_path = getMarksFile(full_path);
size_t fixed_granularity{parse<size_t>(argv[3])};
auto adaptive_granularity = readGranularity(mrk_file_path, fixed_granularity);
auto marks_file_extension = "." + mrk_file_path.getExtension();
bool require_checksums = parse<bool>(argv[2]);
checkDataPart(full_path, adaptive_granularity, marks_file_extension, require_checksums, {});
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
return 0;
}

View File

@ -1,7 +1,7 @@
Columns: Columns:
a a
Column types: Column types:
a BINARY a BIGINT
Result: Result:
0 0
1 1
@ -10,7 +10,7 @@ name
a a
Column types: Column types:
name BINARY name BINARY
a BINARY a TINYINT
Result: Result:
tables 1 tables 1
Columns: Columns:
@ -18,6 +18,6 @@ a
b b
Column types: Column types:
a BINARY a BINARY
b BINARY b TINYINT
Result: Result:
тест 1 тест 1

View File

@ -2,5 +2,7 @@ version: '2.2'
services: services:
mysql1: mysql1:
image: mysql:5.7 image: mysql:5.7
# rewriting default command, because starting server is unnecessary restart: always
command: sleep infinity environment:
MYSQL_ALLOW_EMPTY_PASSWORD: 1
command: --federated --socket /var/run/mysqld/mysqld.sock

View File

@ -108,8 +108,52 @@ def test_mysql_client(mysql_client, server_address):
assert stdout == '\n'.join(['column', '0', '0', '1', '1', '5', '5', 'tmp_column', '0', '1', '']) assert stdout == '\n'.join(['column', '0', '0', '1', '1', '5', '5', 'tmp_column', '0', '1', ''])
def test_mysql_federated(mysql_client, server_address):
node.query('''DROP DATABASE IF EXISTS mysql_federated''', settings={"password": "123"})
node.query('''CREATE DATABASE mysql_federated''', settings={"password": "123"})
node.query('''CREATE TABLE mysql_federated.test (col UInt32) ENGINE = Log''', settings={"password": "123"})
node.query('''INSERT INTO mysql_federated.test VALUES (0), (1), (5)''', settings={"password": "123"})
code, (_, stderr) = mysql_client.exec_run('''
mysql
-e "DROP SERVER IF EXISTS clickhouse;"
-e "CREATE SERVER clickhouse FOREIGN DATA WRAPPER mysql OPTIONS (USER 'default', PASSWORD '123', HOST '{host}', PORT {port}, DATABASE 'mysql_federated');"
-e "DROP DATABASE IF EXISTS mysql_federated;"
-e "CREATE DATABASE mysql_federated;"
'''.format(host=server_address, port=server_port), demux=True)
assert code == 0
code, (stdout, stderr) = mysql_client.exec_run('''
mysql
-e "CREATE TABLE mysql_federated.test(`col` int UNSIGNED) ENGINE=FEDERATED CONNECTION='clickhouse';"
-e "SELECT * FROM mysql_federated.test ORDER BY col;"
'''.format(host=server_address, port=server_port), demux=True)
assert stdout == '\n'.join(['col', '0', '1', '5', ''])
code, (stdout, stderr) = mysql_client.exec_run('''
mysql
-e "INSERT INTO mysql_federated.test VALUES (0), (1), (5);"
-e "SELECT * FROM mysql_federated.test ORDER BY col;"
'''.format(host=server_address, port=server_port), demux=True)
assert stdout == '\n'.join(['col', '0', '0', '1', '1', '5', '5', ''])
def test_python_client(server_address): def test_python_client(server_address):
client = pymysql.connections.Connection(host=server_address, user='user_with_double_sha1', password='abacaba', database='default', port=server_port)
with pytest.raises(pymysql.InternalError) as exc_info:
client.query('select name from tables')
assert exc_info.value.args == (60, "Table default.tables doesn't exist.")
cursor = client.cursor(pymysql.cursors.DictCursor)
cursor.execute("select 1 as a, 'тест' as b")
assert cursor.fetchall() == [{'a': 1, 'b': 'тест'}]
with pytest.raises(pymysql.InternalError) as exc_info: with pytest.raises(pymysql.InternalError) as exc_info:
pymysql.connections.Connection(host=server_address, user='default', password='abacab', database='default', port=server_port) pymysql.connections.Connection(host=server_address, user='default', password='abacab', database='default', port=server_port)
@ -124,7 +168,7 @@ def test_python_client(server_address):
cursor = client.cursor(pymysql.cursors.DictCursor) cursor = client.cursor(pymysql.cursors.DictCursor)
cursor.execute("select 1 as a, 'тест' as b") cursor.execute("select 1 as a, 'тест' as b")
assert cursor.fetchall() == [{'a': '1', 'b': 'тест'}] assert cursor.fetchall() == [{'a': 1, 'b': 'тест'}]
client.select_db('system') client.select_db('system')
@ -140,11 +184,14 @@ def test_python_client(server_address):
cursor.execute("INSERT INTO table1 VALUES (1), (3)") cursor.execute("INSERT INTO table1 VALUES (1), (3)")
cursor.execute("INSERT INTO table1 VALUES (1), (4)") cursor.execute("INSERT INTO table1 VALUES (1), (4)")
cursor.execute("SELECT * FROM table1 ORDER BY a") cursor.execute("SELECT * FROM table1 ORDER BY a")
assert cursor.fetchall() == [{'a': '1'}, {'a': '1'}, {'a': '3'}, {'a': '4'}] assert cursor.fetchall() == [{'a': 1}, {'a': 1}, {'a': 3}, {'a': 4}]
def test_golang_client(server_address, golang_container): def test_golang_client(server_address, golang_container):
# type: (str, Container) -> None # type: (str, Container) -> None
with open(os.path.join(SCRIPT_DIR, 'clients', 'golang', '0.reference')) as fp:
reference = fp.read()
code, (stdout, stderr) = golang_container.exec_run('./main --host {host} --port {port} --user default --password 123 --database ' code, (stdout, stderr) = golang_container.exec_run('./main --host {host} --port {port} --user default --password 123 --database '
'abc'.format(host=server_address, port=server_port), demux=True) 'abc'.format(host=server_address, port=server_port), demux=True)
@ -155,9 +202,11 @@ def test_golang_client(server_address, golang_container):
'default'.format(host=server_address, port=server_port), demux=True) 'default'.format(host=server_address, port=server_port), demux=True)
assert code == 0 assert code == 0
assert stdout == reference
with open(os.path.join(SCRIPT_DIR, 'clients', 'golang', '0.reference')) as fp: code, (stdout, stderr) = golang_container.exec_run('./main --host {host} --port {port} --user user_with_double_sha1 --password abacaba --database '
reference = fp.read() 'default'.format(host=server_address, port=server_port), demux=True)
assert code == 0
assert stdout == reference assert stdout == reference
@ -171,6 +220,14 @@ def test_php_client(server_address, php_container):
assert code == 0 assert code == 0
assert stdout == 'tables\n' assert stdout == 'tables\n'
code, (stdout, stderr) = php_container.exec_run('php -f test.php {host} {port} user_with_double_sha1 abacaba'.format(host=server_address, port=server_port), demux=True)
assert code == 0
assert stdout == 'tables\n'
code, (stdout, stderr) = php_container.exec_run('php -f test_ssl.php {host} {port} user_with_double_sha1 abacaba'.format(host=server_address, port=server_port), demux=True)
assert code == 0
assert stdout == 'tables\n'
def test_mysqljs_client(server_address, nodejs_container): def test_mysqljs_client(server_address, nodejs_container):
code, (_, stderr) = nodejs_container.exec_run('node test.js {host} {port} default 123'.format(host=server_address, port=server_port), demux=True) code, (_, stderr) = nodejs_container.exec_run('node test.js {host} {port} default 123'.format(host=server_address, port=server_port), demux=True)

View File

@ -22,6 +22,8 @@ You can use `substitions`, `create`, `fill` and `drop` queries to prepare test.
Take into account, that these tests will run in CI which consists of 56-cores and 512 RAM machines. Queries will be executed much faster than on local laptop. Take into account, that these tests will run in CI which consists of 56-cores and 512 RAM machines. Queries will be executed much faster than on local laptop.
If your test continued more than 10 minutes, please, add tag `long` to have an opportunity to run all tests and skip long ones.
### How to run performance test ### How to run performance test
You have to run clickhouse-server and after you can start testing: You have to run clickhouse-server and after you can start testing:

View File

@ -0,0 +1,16 @@
<test>
<type>once</type>
<stop_conditions>
<any_of>
<average_speed_not_changing_for_ms>1000</average_speed_not_changing_for_ms>
<total_time_ms>10000</total_time_ms>
</any_of>
</stop_conditions>
<!-- lon [-180; 180], lat [-90; 90] -->
<query>SELECT count() FROM system.numbers WHERE NOT ignore(greatCircleDistance((rand() % 360) * 1. - 180, (number % 150) * 1.2 - 90, (number % 360) + toFloat64(rand()) / 4294967296 - 180, (rand() % 180) * 1. - 90))</query>
<!-- 55.755830, 37.617780 is center of Moscow -->
<query>SELECT count() FROM system.numbers WHERE NOT ignore(greatCircleDistance(55. + toFloat64(rand()) / 4294967296, 37. + toFloat64(rand()) / 4294967296, 55. + toFloat64(rand()) / 4294967296, 37. + toFloat64(rand()) / 4294967296))</query>
</test>

View File

@ -1,3 +1,7 @@
343417
342558
0 0
1
1
1
1
1
1

Some files were not shown because too many files have changed in this diff Show More