mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Merge branch 'master' into fastops
This commit is contained in:
commit
b9d39f4882
2
contrib/hyperscan
vendored
2
contrib/hyperscan
vendored
@ -1 +1 @@
|
||||
Subproject commit 01e6b83f9fbdb4020cd68a5287bf3a0471eeb272
|
||||
Subproject commit 3058c9c20cba3accdf92544d8513a26240c4ff70
|
@ -1,7 +1,6 @@
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Common/Exception.h>
|
||||
|
||||
#include <iostream>
|
||||
#include <type_traits>
|
||||
|
||||
|
||||
@ -34,6 +33,28 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads, size_t max_free_threa
|
||||
{
|
||||
}
|
||||
|
||||
template <typename Thread>
|
||||
void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
max_threads = value;
|
||||
}
|
||||
|
||||
template <typename Thread>
|
||||
void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
max_free_threads = value;
|
||||
}
|
||||
|
||||
template <typename Thread>
|
||||
void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
queue_size = value;
|
||||
}
|
||||
|
||||
|
||||
template <typename Thread>
|
||||
template <typename ReturnType>
|
||||
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds)
|
||||
@ -59,7 +80,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
|
||||
|
||||
auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };
|
||||
|
||||
if (wait_microseconds)
|
||||
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
|
||||
{
|
||||
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
|
||||
return on_error();
|
||||
@ -83,6 +104,15 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
|
||||
catch (...)
|
||||
{
|
||||
threads.pop_front();
|
||||
|
||||
/// Remove the job and return error to caller.
|
||||
/// Note that if we have allocated at least one thread, we may continue
|
||||
/// (one thread is enough to process all jobs).
|
||||
/// But this condition indicate an error nevertheless and better to refuse.
|
||||
|
||||
jobs.pop();
|
||||
--scheduled_jobs;
|
||||
return on_error();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -60,14 +60,18 @@ public:
|
||||
/// Returns number of running and scheduled jobs.
|
||||
size_t active() const;
|
||||
|
||||
void setMaxThreads(size_t value);
|
||||
void setMaxFreeThreads(size_t value);
|
||||
void setQueueSize(size_t value);
|
||||
|
||||
private:
|
||||
mutable std::mutex mutex;
|
||||
std::condition_variable job_finished;
|
||||
std::condition_variable new_job_or_shutdown;
|
||||
|
||||
const size_t max_threads;
|
||||
const size_t max_free_threads;
|
||||
const size_t queue_size;
|
||||
size_t max_threads;
|
||||
size_t max_free_threads;
|
||||
size_t queue_size;
|
||||
|
||||
size_t scheduled_jobs = 0;
|
||||
bool shutdown = false;
|
||||
|
89
dbms/src/Common/tests/gtest_thread_pool_global_full.cpp
Normal file
89
dbms/src/Common/tests/gtest_thread_pool_global_full.cpp
Normal file
@ -0,0 +1,89 @@
|
||||
#include <atomic>
|
||||
|
||||
#include <Common/ThreadPool.h>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
|
||||
/// Test what happens if local ThreadPool cannot create a ThreadFromGlobalPool.
|
||||
/// There was a bug: if local ThreadPool cannot allocate even a single thread,
|
||||
/// the job will be scheduled but never get executed.
|
||||
|
||||
|
||||
TEST(ThreadPool, GlobalFull1)
|
||||
{
|
||||
GlobalThreadPool & global_pool = GlobalThreadPool::instance();
|
||||
|
||||
static constexpr size_t capacity = 5;
|
||||
|
||||
global_pool.setMaxThreads(capacity);
|
||||
global_pool.setMaxFreeThreads(1);
|
||||
global_pool.setQueueSize(capacity);
|
||||
global_pool.wait();
|
||||
|
||||
std::atomic<size_t> counter = 0;
|
||||
static constexpr size_t num_jobs = capacity + 1;
|
||||
|
||||
auto func = [&] { ++counter; while (counter != num_jobs) {} };
|
||||
|
||||
ThreadPool pool(num_jobs);
|
||||
|
||||
for (size_t i = 0; i < capacity; ++i)
|
||||
pool.schedule(func);
|
||||
|
||||
for (size_t i = capacity; i < num_jobs; ++i)
|
||||
{
|
||||
EXPECT_THROW(pool.schedule(func), DB::Exception);
|
||||
++counter;
|
||||
}
|
||||
|
||||
pool.wait();
|
||||
EXPECT_EQ(counter, num_jobs);
|
||||
|
||||
global_pool.setMaxThreads(10000);
|
||||
global_pool.setMaxFreeThreads(1000);
|
||||
global_pool.setQueueSize(10000);
|
||||
}
|
||||
|
||||
|
||||
TEST(ThreadPool, GlobalFull2)
|
||||
{
|
||||
GlobalThreadPool & global_pool = GlobalThreadPool::instance();
|
||||
|
||||
static constexpr size_t capacity = 5;
|
||||
|
||||
global_pool.setMaxThreads(capacity);
|
||||
global_pool.setMaxFreeThreads(1);
|
||||
global_pool.setQueueSize(capacity);
|
||||
|
||||
/// ThreadFromGlobalPool from local thread pools from previous test case have exited
|
||||
/// but their threads from global_pool may not have finished (they still have to exit).
|
||||
/// If we will not wait here, we can get "Cannot schedule a task exception" earlier than we expect in this test.
|
||||
global_pool.wait();
|
||||
|
||||
std::atomic<size_t> counter = 0;
|
||||
auto func = [&] { ++counter; while (counter != capacity + 1) {} };
|
||||
|
||||
ThreadPool pool(capacity, 0, capacity);
|
||||
for (size_t i = 0; i < capacity; ++i)
|
||||
pool.schedule(func);
|
||||
|
||||
ThreadPool another_pool(1);
|
||||
EXPECT_THROW(another_pool.schedule(func), DB::Exception);
|
||||
|
||||
++counter;
|
||||
|
||||
pool.wait();
|
||||
|
||||
global_pool.wait();
|
||||
|
||||
for (size_t i = 0; i < capacity; ++i)
|
||||
another_pool.schedule([&] { ++counter; });
|
||||
|
||||
another_pool.wait();
|
||||
EXPECT_EQ(counter, capacity * 2 + 1);
|
||||
|
||||
global_pool.setMaxThreads(10000);
|
||||
global_pool.setMaxFreeThreads(1000);
|
||||
global_pool.setQueueSize(10000);
|
||||
}
|
@ -29,11 +29,12 @@ struct YandexConsistentHashImpl
|
||||
static constexpr auto name = "yandexConsistentHash";
|
||||
|
||||
using HashType = UInt64;
|
||||
/// Actually it supports UInt64, but it is effective only if n < 65536
|
||||
using ResultType = UInt32;
|
||||
using BucketsCountType = ResultType;
|
||||
/// Actually it supports UInt64, but it is efficient only if n <= 32768
|
||||
using ResultType = UInt16;
|
||||
using BucketsType = ResultType;
|
||||
static constexpr auto max_buckets = 32768;
|
||||
|
||||
static inline ResultType apply(UInt64 hash, BucketsCountType n)
|
||||
static inline ResultType apply(UInt64 hash, BucketsType n)
|
||||
{
|
||||
return ConsistentHashing(hash, n);
|
||||
}
|
||||
@ -59,9 +60,10 @@ struct JumpConsistentHashImpl
|
||||
|
||||
using HashType = UInt64;
|
||||
using ResultType = Int32;
|
||||
using BucketsCountType = ResultType;
|
||||
using BucketsType = ResultType;
|
||||
static constexpr auto max_buckets = static_cast<UInt64>(std::numeric_limits<BucketsType>::max());
|
||||
|
||||
static inline ResultType apply(UInt64 hash, BucketsCountType n)
|
||||
static inline ResultType apply(UInt64 hash, BucketsType n)
|
||||
{
|
||||
return JumpConsistentHash(hash, n);
|
||||
}
|
||||
@ -74,9 +76,10 @@ struct SumburConsistentHashImpl
|
||||
|
||||
using HashType = UInt32;
|
||||
using ResultType = UInt16;
|
||||
using BucketsCountType = ResultType;
|
||||
using BucketsType = ResultType;
|
||||
static constexpr auto max_buckets = static_cast<UInt64>(std::numeric_limits<BucketsType>::max());
|
||||
|
||||
static inline ResultType apply(HashType hash, BucketsCountType n)
|
||||
static inline ResultType apply(HashType hash, BucketsType n)
|
||||
{
|
||||
return static_cast<ResultType>(sumburConsistentHash(hash, n));
|
||||
}
|
||||
@ -143,8 +146,7 @@ public:
|
||||
private:
|
||||
using HashType = typename Impl::HashType;
|
||||
using ResultType = typename Impl::ResultType;
|
||||
using BucketsType = typename Impl::BucketsCountType;
|
||||
static constexpr auto max_buckets = static_cast<UInt64>(std::numeric_limits<BucketsType>::max());
|
||||
using BucketsType = typename Impl::BucketsType;
|
||||
|
||||
template <typename T>
|
||||
inline BucketsType checkBucketsRange(T buckets)
|
||||
@ -153,10 +155,9 @@ private:
|
||||
throw Exception(
|
||||
"The second argument of function " + getName() + " (number of buckets) must be positive number", ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
if (unlikely(static_cast<UInt64>(buckets) > max_buckets))
|
||||
throw Exception("The value of the second argument of function " + getName() + " (number of buckets) is not fit to "
|
||||
+ DataTypeNumber<BucketsType>().getName(),
|
||||
ErrorCodes::BAD_ARGUMENTS);
|
||||
if (unlikely(static_cast<UInt64>(buckets) > Impl::max_buckets))
|
||||
throw Exception("The value of the second argument of function " + getName() + " (number of buckets) must not be greater than "
|
||||
+ std::to_string(Impl::max_buckets), ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
return static_cast<BucketsType>(buckets);
|
||||
}
|
||||
|
@ -155,7 +155,9 @@ using AggregatedDataWithNullableStringKeyTwoLevel = AggregationDataWithNullKeyTw
|
||||
|
||||
|
||||
/// For the case where there is one numeric key.
|
||||
template <typename FieldType, typename TData> /// UInt8/16/32/64 for any type with corresponding bit width.
|
||||
/// FieldType is UInt8/16/32/64 for any type with corresponding bit width.
|
||||
template <typename FieldType, typename TData,
|
||||
bool consecutive_keys_optimization = true>
|
||||
struct AggregationMethodOneNumber
|
||||
{
|
||||
using Data = TData;
|
||||
@ -172,7 +174,8 @@ struct AggregationMethodOneNumber
|
||||
AggregationMethodOneNumber(const Other & other) : data(other.data) {}
|
||||
|
||||
/// To use one `Method` in different threads, use different `State`.
|
||||
using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type, Mapped, FieldType>;
|
||||
using State = ColumnsHashing::HashMethodOneNumber<typename Data::value_type,
|
||||
Mapped, FieldType, consecutive_keys_optimization>;
|
||||
|
||||
/// Use optimization for low cardinality.
|
||||
static const bool low_cardinality_optimization = false;
|
||||
@ -421,8 +424,10 @@ struct AggregatedDataVariants : private boost::noncopyable
|
||||
*/
|
||||
AggregatedDataWithoutKey without_key = nullptr;
|
||||
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key>> key8;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key>> key16;
|
||||
// Disable consecutive key optimization for Uint8/16, because they use a FixedHashMap
|
||||
// and the lookup there is almost free, so we don't need to cache the last lookup result
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt8, AggregatedDataWithUInt8Key, false>> key8;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt16, AggregatedDataWithUInt16Key, false>> key16;
|
||||
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt32, AggregatedDataWithUInt64Key>> key32;
|
||||
std::unique_ptr<AggregationMethodOneNumber<UInt64, AggregatedDataWithUInt64Key>> key64;
|
||||
|
@ -55,6 +55,9 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
|
||||
|
||||
String part_name = params.get("part");
|
||||
|
||||
/// Validation of the input that may come from malicious replica.
|
||||
MergeTreePartInfo::fromPartName(part_name, data.format_version);
|
||||
|
||||
static std::atomic_uint total_sends {0};
|
||||
|
||||
if ((data.settings.replicated_max_parallel_sends && total_sends >= data.settings.replicated_max_parallel_sends)
|
||||
@ -169,6 +172,9 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
|
||||
bool to_detached,
|
||||
const String & tmp_prefix_)
|
||||
{
|
||||
/// Validation of the input that may come from malicious replica.
|
||||
MergeTreePartInfo::fromPartName(part_name, data.format_version);
|
||||
|
||||
Poco::URI uri;
|
||||
uri.setScheme(interserver_scheme);
|
||||
uri.setHost(host);
|
||||
|
@ -1,303 +0,0 @@
|
||||
#include <fstream>
|
||||
#include <sstream>
|
||||
#include <boost/filesystem.hpp>
|
||||
|
||||
#include <Storages/StorageCatBoostPool.h>
|
||||
#include <DataStreams/IBlockInputStream.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <DataTypes/DataTypeString.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataStreams/FilterColumnsBlockInputStream.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Parsers/ASTIdentifier.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_OPEN_FILE;
|
||||
extern const int CANNOT_PARSE_TEXT;
|
||||
extern const int DATABASE_ACCESS_DENIED;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
class CatBoostDatasetBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
|
||||
CatBoostDatasetBlockInputStream(const std::string & file_name, const std::string & format_name,
|
||||
const Block & sample_block, const Context & context, UInt64 max_block_size)
|
||||
: file_name(file_name), format_name(format_name)
|
||||
{
|
||||
read_buf = std::make_unique<ReadBufferFromFile>(file_name);
|
||||
reader = FormatFactory::instance().getInput(format_name, *read_buf, sample_block, context, max_block_size);
|
||||
}
|
||||
|
||||
String getName() const override
|
||||
{
|
||||
return "CatBoostDataset";
|
||||
}
|
||||
|
||||
Block readImpl() override
|
||||
{
|
||||
return reader->read();
|
||||
}
|
||||
|
||||
void readPrefixImpl() override
|
||||
{
|
||||
reader->readPrefix();
|
||||
}
|
||||
|
||||
void readSuffixImpl() override
|
||||
{
|
||||
reader->readSuffix();
|
||||
}
|
||||
|
||||
Block getHeader() const override { return reader->getHeader(); }
|
||||
|
||||
private:
|
||||
std::unique_ptr<ReadBufferFromFileDescriptor> read_buf;
|
||||
BlockInputStreamPtr reader;
|
||||
std::string file_name;
|
||||
std::string format_name;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
static boost::filesystem::path canonicalPath(std::string && path)
|
||||
{
|
||||
return boost::filesystem::canonical(boost::filesystem::path(path));
|
||||
}
|
||||
|
||||
static std::string resolvePath(const boost::filesystem::path & base_path, std::string && path)
|
||||
{
|
||||
boost::filesystem::path resolved_path(path);
|
||||
if (!resolved_path.is_absolute())
|
||||
return boost::filesystem::canonical(resolved_path, base_path).string();
|
||||
return boost::filesystem::canonical(resolved_path).string();
|
||||
}
|
||||
|
||||
static void checkCreationIsAllowed(const String & base_path, const String & path)
|
||||
{
|
||||
if (base_path != path.substr(0, base_path.size()))
|
||||
throw Exception(
|
||||
"Using file descriptor or user specified path as source of storage isn't allowed for server daemons",
|
||||
ErrorCodes::DATABASE_ACCESS_DENIED);
|
||||
}
|
||||
|
||||
|
||||
StorageCatBoostPool::StorageCatBoostPool(
|
||||
const String & database_name_,
|
||||
const String & table_name_,
|
||||
const Context & context,
|
||||
String column_description_file_name_,
|
||||
String data_description_file_name_)
|
||||
: table_name(table_name_)
|
||||
, database_name(database_name_)
|
||||
, column_description_file_name(std::move(column_description_file_name_))
|
||||
, data_description_file_name(std::move(data_description_file_name_))
|
||||
{
|
||||
auto base_path = canonicalPath(context.getPath());
|
||||
column_description_file_name = resolvePath(base_path, std::move(column_description_file_name));
|
||||
data_description_file_name = resolvePath(base_path, std::move(data_description_file_name));
|
||||
if (context.getApplicationType() == Context::ApplicationType::SERVER)
|
||||
{
|
||||
const auto & base_path_str = base_path.string();
|
||||
checkCreationIsAllowed(base_path_str, column_description_file_name);
|
||||
checkCreationIsAllowed(base_path_str, data_description_file_name);
|
||||
}
|
||||
|
||||
parseColumnDescription();
|
||||
createSampleBlockAndColumns();
|
||||
}
|
||||
|
||||
std::string StorageCatBoostPool::getColumnTypesString(const ColumnTypesMap & columnTypesMap)
|
||||
{
|
||||
std::string types_string;
|
||||
bool first = true;
|
||||
for (const auto & value : columnTypesMap)
|
||||
{
|
||||
if (!first)
|
||||
types_string.append(", ");
|
||||
|
||||
first = false;
|
||||
types_string += value.first;
|
||||
}
|
||||
|
||||
return types_string;
|
||||
}
|
||||
|
||||
void StorageCatBoostPool::checkDatasetDescription()
|
||||
{
|
||||
std::ifstream in(data_description_file_name);
|
||||
if (!in.good())
|
||||
throw Exception("Cannot open file: " + data_description_file_name, ErrorCodes::CANNOT_OPEN_FILE);
|
||||
|
||||
std::string line;
|
||||
if (!std::getline(in, line))
|
||||
throw Exception("File is empty: " + data_description_file_name, ErrorCodes::CANNOT_PARSE_TEXT);
|
||||
|
||||
size_t columns_count = 1;
|
||||
for (char sym : line)
|
||||
if (sym == '\t')
|
||||
++columns_count;
|
||||
|
||||
columns_description.resize(columns_count);
|
||||
}
|
||||
|
||||
void StorageCatBoostPool::parseColumnDescription()
|
||||
{
|
||||
/// NOTE: simple parsing
|
||||
/// TODO: use ReadBufferFromFile
|
||||
|
||||
checkDatasetDescription();
|
||||
|
||||
std::ifstream in(column_description_file_name);
|
||||
if (!in.good())
|
||||
throw Exception("Cannot open file: " + column_description_file_name, ErrorCodes::CANNOT_OPEN_FILE);
|
||||
|
||||
std::string line;
|
||||
size_t line_num = 0;
|
||||
auto column_types_map = getColumnTypesMap();
|
||||
auto column_types_string = getColumnTypesString(column_types_map);
|
||||
|
||||
/// Enumerate default names for columns as Auxiliary, Auxiliary1, Auxiliary2, ...
|
||||
std::map<DatasetColumnType, size_t> columns_per_type_count;
|
||||
|
||||
while (std::getline(in, line))
|
||||
{
|
||||
++line_num;
|
||||
std::string str_line_num = std::to_string(line_num);
|
||||
|
||||
if (line.empty())
|
||||
continue;
|
||||
|
||||
std::istringstream iss(line);
|
||||
std::vector<std::string> tokens;
|
||||
std::string token;
|
||||
while (std::getline(iss, token, '\t'))
|
||||
tokens.push_back(token);
|
||||
|
||||
if (tokens.size() != 2 && tokens.size() != 3)
|
||||
throw Exception("Cannot parse column description at line " + str_line_num + " '" + line + "' "
|
||||
+ ": expected 2 or 3 columns, got " + std::to_string(tokens.size()),
|
||||
ErrorCodes::CANNOT_PARSE_TEXT);
|
||||
|
||||
std::string str_id = tokens[0];
|
||||
std::string col_type = tokens[1];
|
||||
std::string col_alias = tokens.size() > 2 ? tokens[2] : "";
|
||||
|
||||
size_t num_id;
|
||||
try
|
||||
{
|
||||
num_id = std::stoull(str_id);
|
||||
}
|
||||
catch (std::exception & e)
|
||||
{
|
||||
throw Exception("Cannot parse column index at row " + str_line_num + ": " + e.what(),
|
||||
ErrorCodes::CANNOT_PARSE_TEXT);
|
||||
}
|
||||
|
||||
if (num_id >= columns_description.size())
|
||||
throw Exception("Invalid index at row " + str_line_num + ": " + str_id
|
||||
+ ", expected in range [0, " + std::to_string(columns_description.size()) + ")",
|
||||
ErrorCodes::CANNOT_PARSE_TEXT);
|
||||
|
||||
if (column_types_map.count(col_type) == 0)
|
||||
throw Exception("Invalid column type: " + col_type + ", expected: " + column_types_string,
|
||||
ErrorCodes::CANNOT_PARSE_TEXT);
|
||||
|
||||
auto type = column_types_map[col_type];
|
||||
|
||||
std::string col_name;
|
||||
|
||||
bool is_feature_column = type == DatasetColumnType::Num || type == DatasetColumnType::Categ;
|
||||
auto & col_number = columns_per_type_count[type];
|
||||
/// If column is not feature skip '0' after the name (to use 'Target' instead of 'Target0').
|
||||
col_name = col_type + (is_feature_column || col_number ? std::to_string(col_number) : "");
|
||||
++col_number;
|
||||
|
||||
columns_description[num_id] = ColumnDescription(col_name, col_alias, type);
|
||||
}
|
||||
}
|
||||
|
||||
void StorageCatBoostPool::createSampleBlockAndColumns()
|
||||
{
|
||||
ColumnsDescription columns;
|
||||
NamesAndTypesList cat_columns;
|
||||
NamesAndTypesList num_columns;
|
||||
NamesAndTypesList other_columns;
|
||||
sample_block.clear();
|
||||
|
||||
auto get_type = [](DatasetColumnType column_type) -> DataTypePtr
|
||||
{
|
||||
if (column_type == DatasetColumnType::Categ
|
||||
|| column_type == DatasetColumnType::Auxiliary
|
||||
|| column_type == DatasetColumnType::DocId)
|
||||
return std::make_shared<DataTypeString>();
|
||||
else
|
||||
return std::make_shared<DataTypeFloat64>();
|
||||
};
|
||||
|
||||
for (auto & desc : columns_description)
|
||||
{
|
||||
DataTypePtr type = get_type(desc.column_type);
|
||||
|
||||
if (desc.column_type == DatasetColumnType::Categ)
|
||||
cat_columns.emplace_back(desc.column_name, type);
|
||||
else if (desc.column_type == DatasetColumnType::Num)
|
||||
num_columns.emplace_back(desc.column_name, type);
|
||||
else
|
||||
other_columns.emplace_back(desc.column_name, type);
|
||||
|
||||
sample_block.insert(ColumnWithTypeAndName(type, desc.column_name));
|
||||
}
|
||||
|
||||
/// Order is important: first numeric columns, then categorial, then all others.
|
||||
for (const auto & column : num_columns)
|
||||
columns.add(DB::ColumnDescription(column.name, column.type, false));
|
||||
for (const auto & column : cat_columns)
|
||||
columns.add(DB::ColumnDescription(column.name, column.type, false));
|
||||
for (const auto & column : other_columns)
|
||||
{
|
||||
DB::ColumnDescription column_desc(column.name, column.type, false);
|
||||
/// We assign Materialized kind to the column so that it doesn't show in SELECT *.
|
||||
/// Because the table is readonly, we do not need default expression.
|
||||
column_desc.default_desc.kind = ColumnDefaultKind::Materialized;
|
||||
columns.add(std::move(column_desc));
|
||||
}
|
||||
|
||||
for (auto & desc : columns_description)
|
||||
{
|
||||
if (!desc.alias.empty())
|
||||
{
|
||||
DB::ColumnDescription column(desc.alias, get_type(desc.column_type), false);
|
||||
column.default_desc.kind = ColumnDefaultKind::Alias;
|
||||
column.default_desc.expression = std::make_shared<ASTIdentifier>(desc.column_name);
|
||||
columns.add(std::move(column));
|
||||
}
|
||||
}
|
||||
|
||||
setColumns(columns);
|
||||
}
|
||||
|
||||
BlockInputStreams StorageCatBoostPool::read(
|
||||
const Names & column_names,
|
||||
const SelectQueryInfo & /*query_info*/,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum /*processed_stage*/,
|
||||
size_t max_block_size,
|
||||
unsigned /*threads*/)
|
||||
{
|
||||
auto stream = std::make_shared<CatBoostDatasetBlockInputStream>(
|
||||
data_description_file_name, "TSV", sample_block, context, max_block_size);
|
||||
|
||||
auto filter_stream = std::make_shared<FilterColumnsBlockInputStream>(stream, column_names, false);
|
||||
return { filter_stream };
|
||||
}
|
||||
|
||||
}
|
@ -1,82 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Core/Defines.h>
|
||||
#include <ext/shared_ptr_helper.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageCatBoostPool : public ext::shared_ptr_helper<StorageCatBoostPool>, public IStorage
|
||||
{
|
||||
public:
|
||||
std::string getName() const override { return "CatBoostPool"; }
|
||||
std::string getTableName() const override { return table_name; }
|
||||
std::string getDatabaseName() const override { return database_name; }
|
||||
|
||||
BlockInputStreams read(const Names & column_names,
|
||||
const SelectQueryInfo & query_info,
|
||||
const Context & context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned threads) override;
|
||||
|
||||
private:
|
||||
String table_name;
|
||||
String database_name;
|
||||
|
||||
String column_description_file_name;
|
||||
String data_description_file_name;
|
||||
Block sample_block;
|
||||
|
||||
enum class DatasetColumnType
|
||||
{
|
||||
Target,
|
||||
Num,
|
||||
Categ,
|
||||
Auxiliary,
|
||||
DocId,
|
||||
Weight,
|
||||
Baseline
|
||||
};
|
||||
|
||||
using ColumnTypesMap = std::map<std::string, DatasetColumnType>;
|
||||
|
||||
ColumnTypesMap getColumnTypesMap() const
|
||||
{
|
||||
return
|
||||
{
|
||||
{"Target", DatasetColumnType::Target},
|
||||
{"Num", DatasetColumnType::Num},
|
||||
{"Categ", DatasetColumnType::Categ},
|
||||
{"Auxiliary", DatasetColumnType::Auxiliary},
|
||||
{"DocId", DatasetColumnType::DocId},
|
||||
{"Weight", DatasetColumnType::Weight},
|
||||
{"Baseline", DatasetColumnType::Baseline},
|
||||
};
|
||||
}
|
||||
|
||||
std::string getColumnTypesString(const ColumnTypesMap & columnTypesMap);
|
||||
|
||||
struct ColumnDescription
|
||||
{
|
||||
std::string column_name;
|
||||
std::string alias;
|
||||
DatasetColumnType column_type;
|
||||
|
||||
ColumnDescription() : column_type(DatasetColumnType::Num) {}
|
||||
ColumnDescription(std::string column_name, std::string alias, DatasetColumnType column_type)
|
||||
: column_name(std::move(column_name)), alias(std::move(alias)), column_type(column_type) {}
|
||||
};
|
||||
|
||||
std::vector<ColumnDescription> columns_description;
|
||||
|
||||
void checkDatasetDescription();
|
||||
void parseColumnDescription();
|
||||
void createSampleBlockAndColumns();
|
||||
|
||||
protected:
|
||||
StorageCatBoostPool(const String & database_name_, const String & table_name_, const Context & context, String column_description_file_name, String data_description_file_name);
|
||||
};
|
||||
|
||||
}
|
@ -1,56 +0,0 @@
|
||||
#include <TableFunctions/TableFunctionCatBoostPool.h>
|
||||
#include <Storages/StorageCatBoostPool.h>
|
||||
#include <TableFunctions/TableFunctionFactory.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
|
||||
StoragePtr TableFunctionCatBoostPool::executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const
|
||||
{
|
||||
ASTs & args_func = ast_function->children;
|
||||
|
||||
std::string err = "Table function '" + getName() + "' requires 2 parameters: "
|
||||
+ "column descriptions file, dataset description file";
|
||||
|
||||
if (args_func.size() != 1)
|
||||
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
ASTs & args = args_func.at(0)->children;
|
||||
|
||||
if (args.size() != 2)
|
||||
throw Exception(err, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
|
||||
|
||||
auto getStringLiteral = [](const IAST & node, const char * description)
|
||||
{
|
||||
const auto * lit = node.as<ASTLiteral>();
|
||||
if (!lit)
|
||||
throw Exception(description + String(" must be string literal (in single quotes)."), ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
if (lit->value.getType() != Field::Types::String)
|
||||
throw Exception(description + String(" must be string literal (in single quotes)."), ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
return safeGet<const String &>(lit->value);
|
||||
};
|
||||
String column_descriptions_file = getStringLiteral(*args[0], "Column descriptions file");
|
||||
String dataset_description_file = getStringLiteral(*args[1], "Dataset description file");
|
||||
|
||||
return StorageCatBoostPool::create(getDatabaseName(), table_name, context, column_descriptions_file, dataset_description_file);
|
||||
}
|
||||
|
||||
void registerTableFunctionCatBoostPool(TableFunctionFactory & factory)
|
||||
{
|
||||
factory.registerFunction<TableFunctionCatBoostPool>();
|
||||
}
|
||||
|
||||
}
|
@ -1,21 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <TableFunctions/ITableFunction.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/* catboostPool('column_descriptions_file', 'dataset_description_file')
|
||||
* Create storage from CatBoost dataset.
|
||||
*/
|
||||
class TableFunctionCatBoostPool : public ITableFunction
|
||||
{
|
||||
public:
|
||||
static constexpr auto name = "catBoostPool";
|
||||
std::string getName() const override { return name; }
|
||||
private:
|
||||
StoragePtr executeImpl(const ASTPtr & ast_function, const Context & context, const std::string & table_name) const override;
|
||||
};
|
||||
|
||||
}
|
@ -10,7 +10,6 @@ namespace DB
|
||||
void registerTableFunctionMerge(TableFunctionFactory & factory);
|
||||
void registerTableFunctionRemote(TableFunctionFactory & factory);
|
||||
void registerTableFunctionNumbers(TableFunctionFactory & factory);
|
||||
void registerTableFunctionCatBoostPool(TableFunctionFactory & factory);
|
||||
void registerTableFunctionFile(TableFunctionFactory & factory);
|
||||
void registerTableFunctionURL(TableFunctionFactory & factory);
|
||||
void registerTableFunctionValues(TableFunctionFactory & factory);
|
||||
@ -37,7 +36,6 @@ void registerTableFunctions()
|
||||
registerTableFunctionMerge(factory);
|
||||
registerTableFunctionRemote(factory);
|
||||
registerTableFunctionNumbers(factory);
|
||||
registerTableFunctionCatBoostPool(factory);
|
||||
registerTableFunctionFile(factory);
|
||||
registerTableFunctionURL(factory);
|
||||
registerTableFunctionValues(factory);
|
||||
|
20
dbms/tests/performance/empty_string_deserialization.xml
Normal file
20
dbms/tests/performance/empty_string_deserialization.xml
Normal file
@ -0,0 +1,20 @@
|
||||
<test>
|
||||
<type>loop</type>
|
||||
|
||||
<stop_conditions>
|
||||
<all_of>
|
||||
<iterations>10</iterations>
|
||||
</all_of>
|
||||
</stop_conditions>
|
||||
|
||||
<!-- gcc-8 generates 20% faster code than gcc-9
|
||||
clang-8 generates more than two times slower code than gcc
|
||||
-->
|
||||
|
||||
<create_query>CREATE TABLE empty_strings (s String) ENGINE = Log;</create_query>
|
||||
<fill_query>INSERT INTO empty_strings SELECT '' FROM numbers(1000000000);</fill_query>
|
||||
|
||||
<query>SELECT count() FROM empty_strings</query>
|
||||
|
||||
<drop_query>DROP TABLE IF EXISTS empty_strings</drop_query>
|
||||
</test>
|
17
dbms/tests/performance/empty_string_serialization.xml
Normal file
17
dbms/tests/performance/empty_string_serialization.xml
Normal file
@ -0,0 +1,17 @@
|
||||
<test>
|
||||
<type>loop</type>
|
||||
|
||||
<stop_conditions>
|
||||
<all_of>
|
||||
<iterations>10</iterations>
|
||||
</all_of>
|
||||
</stop_conditions>
|
||||
|
||||
<!-- gcc-8 generates 20% faster code than gcc-9
|
||||
clang-8 generates more than two times slower code than gcc
|
||||
-->
|
||||
|
||||
<create_query>CREATE TABLE empty_strings (s String) ENGINE = Log;</create_query>
|
||||
<query>INSERT INTO empty_strings SELECT '' FROM numbers(100000000);</query>
|
||||
<drop_query>DROP TABLE IF EXISTS empty_strings</drop_query>
|
||||
</test>
|
@ -1 +0,0 @@
|
||||
Hello
|
@ -1,32 +0,0 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
. $CURDIR/../shell_config.sh
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="drop table if exists catboost_pool_desc;"
|
||||
${CLICKHOUSE_CLIENT} --query="drop table if exists catboost_pool_vals;"
|
||||
${CLICKHOUSE_CLIENT} --query="create table catboost_pool_desc (id String, type String) engine = File(TSV);"
|
||||
${CLICKHOUSE_CLIENT} --query="insert into catboost_pool_desc select '0', 'Categ';"
|
||||
${CLICKHOUSE_CLIENT} --query="create table catboost_pool_vals (str String) engine = File(TSV);"
|
||||
${CLICKHOUSE_CLIENT} --query="insert into catboost_pool_vals select 'Hello';"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('data/${CLICKHOUSE_DATABASE}/catboost_pool_desc/data.TSV', 'data/${CLICKHOUSE_DATABASE}/catboost_pool_vals/data.TSV');"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('data/${CLICKHOUSE_DATABASE}/catboost_pool_desc/data.TSV', '${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('data/${CLICKHOUSE_DATABASE}/catboost_pool_desc/data.TSV', '../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('data/${CLICKHOUSE_DATABASE}/catboost_pool_desc/data.TSV', '../../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('data/${CLICKHOUSE_DATABASE}/catboost_pool_desc/data.TSV', '../../../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('data/${CLICKHOUSE_DATABASE}/catboost_pool_desc/data.TSV', '../../../../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('data/${CLICKHOUSE_DATABASE}/catboost_pool_desc/data.TSV', '../../../../../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('data/${CLICKHOUSE_DATABASE}/catboost_pool_desc/data.TSV', '../../../../../../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('data/${CLICKHOUSE_DATABASE}/catboost_pool_desc/data.TSV', '../../../../../../../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('${CURDIR}/00689_file.txt', '${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('../${CURDIR}/00689_file.txt', '../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('../../${CURDIR}/00689_file.txt', '../../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('../../../${CURDIR}/00689_file.txt', '../../../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('../../../../${CURDIR}/00689_file.txt', '../../../../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('../../../../../${CURDIR}/00689_file.txt', '../../../../../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
${CLICKHOUSE_CLIENT} --query="select * from catBoostPool('../../../../../../${CURDIR}/00689_file.txt', '../../../../../../${CURDIR}/00689_file.txt');" 2>&1 | grep -o "Data"
|
||||
|
||||
${CLICKHOUSE_CLIENT} --query="drop table if exists catboost_pool_desc;"
|
||||
${CLICKHOUSE_CLIENT} --query="drop table if exists catboost_pool_vals;"
|
@ -0,0 +1 @@
|
||||
SELECT yandexConsistentHash(-1, 40000); -- { serverError 36 }
|
@ -15,5 +15,5 @@
|
||||
* It requires O(1) memory and cpu to calculate. So, it is faster than classic
|
||||
* consistent hashing algos with points on circle.
|
||||
*/
|
||||
std::size_t ConsistentHashing(std::uint64_t x, std::size_t n); // Works good for n < 65536
|
||||
std::size_t ConsistentHashing(std::uint64_t lo, std::uint64_t hi, std::size_t n); // Works good for n < 4294967296
|
||||
std::size_t ConsistentHashing(std::uint64_t x, std::size_t n); // Works for n <= 32768
|
||||
std::size_t ConsistentHashing(std::uint64_t lo, std::uint64_t hi, std::size_t n); // Works for n <= 2^31
|
||||
|
Loading…
Reference in New Issue
Block a user