mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 13:13:36 +00:00
Merge branch 'master' into csv_unquoted_nulls_and_default_values
This commit is contained in:
commit
92a8e00db2
@ -13,7 +13,10 @@ if (NOT DEFINED ENABLE_POCO_NETSSL OR ENABLE_POCO_NETSSL)
|
||||
list (APPEND POCO_COMPONENTS Crypto NetSSL)
|
||||
endif ()
|
||||
if (NOT DEFINED ENABLE_POCO_MONGODB OR ENABLE_POCO_MONGODB)
|
||||
set(ENABLE_POCO_MONGODB 1 CACHE BOOL "")
|
||||
list (APPEND POCO_COMPONENTS MongoDB)
|
||||
else ()
|
||||
set(ENABLE_POCO_MONGODB 0 CACHE BOOL "")
|
||||
endif ()
|
||||
# TODO: after new poco release with SQL library rename ENABLE_POCO_ODBC -> ENABLE_POCO_SQLODBC
|
||||
if (NOT DEFINED ENABLE_POCO_ODBC OR ENABLE_POCO_ODBC)
|
||||
@ -37,6 +40,7 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
|
||||
set (ENABLE_DATA_MYSQL 0 CACHE BOOL "")
|
||||
set (ENABLE_DATA_POSTGRESQL 0 CACHE BOOL "")
|
||||
set (ENABLE_ENCODINGS 0 CACHE BOOL "")
|
||||
set (ENABLE_MONGODB ${ENABLE_POCO_MONGODB} CACHE BOOL "" FORCE)
|
||||
|
||||
# new after 2.0.0:
|
||||
set (POCO_ENABLE_ZIP 0 CACHE BOOL "")
|
||||
@ -60,7 +64,7 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
|
||||
"${ClickHouse_SOURCE_DIR}/contrib/poco/Util/include/"
|
||||
)
|
||||
|
||||
if (NOT DEFINED ENABLE_POCO_MONGODB OR ENABLE_POCO_MONGODB)
|
||||
if (ENABLE_POCO_MONGODB)
|
||||
set (Poco_MongoDB_LIBRARY PocoMongoDB)
|
||||
set (Poco_MongoDB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/poco/MongoDB/include/")
|
||||
endif ()
|
||||
|
2
contrib/poco
vendored
2
contrib/poco
vendored
@ -1 +1 @@
|
||||
Subproject commit ece721f1085e3894cb5286e8560af84cd1445326
|
||||
Subproject commit ea2516be366a73a02a82b499ed4a7db1d40037e0
|
@ -384,7 +384,10 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
|
||||
{
|
||||
const auto & db_and_table = query_context->getInsertionTable();
|
||||
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
|
||||
sendTableColumns(query_context->getTable(db_and_table.first, db_and_table.second)->getColumns());
|
||||
{
|
||||
if (!db_and_table.second.empty())
|
||||
sendTableColumns(query_context->getTable(db_and_table.first, db_and_table.second)->getColumns());
|
||||
}
|
||||
}
|
||||
|
||||
/// Send block to the client - table structure.
|
||||
|
@ -126,20 +126,32 @@ private:
|
||||
{
|
||||
for (size_t i = 0; i < buf_size(); ++i)
|
||||
{
|
||||
if (buf[i] && !good(buf[i]))
|
||||
if (buf[i])
|
||||
{
|
||||
buf[i] = 0;
|
||||
--m_size;
|
||||
if (!good(buf[i]))
|
||||
{
|
||||
buf[i] = 0;
|
||||
--m_size;
|
||||
}
|
||||
/** After removing the elements, there may have been room for items,
|
||||
* which were placed further than necessary, due to a collision.
|
||||
* You need to move them.
|
||||
*/
|
||||
else if (i != place(buf[i]))
|
||||
{
|
||||
HashValue x = buf[i];
|
||||
buf[i] = 0;
|
||||
reinsertImpl(x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** After removing the elements, there may have been room for items,
|
||||
* which were placed further than necessary, due to a collision.
|
||||
* You need to move them.
|
||||
/** We must process first collision resolution chain once again.
|
||||
* Look at the comment in "resize" function.
|
||||
*/
|
||||
for (size_t i = 0; i < buf_size(); ++i)
|
||||
for (size_t i = 0; i < buf_size() && buf[i]; ++i)
|
||||
{
|
||||
if (unlikely(buf[i] && i != place(buf[i])))
|
||||
if (i != place(buf[i]))
|
||||
{
|
||||
HashValue x = buf[i];
|
||||
buf[i] = 0;
|
||||
|
@ -139,3 +139,7 @@
|
||||
/// This number is only used for distributed version compatible.
|
||||
/// It could be any magic number.
|
||||
#define DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER 0xCAFECABE
|
||||
|
||||
/// A macro for suppressing warnings about unused variables or function results.
|
||||
/// Useful for structured bindings which have no standard way to declare this.
|
||||
#define UNUSED(X) (void) (X)
|
||||
|
@ -170,7 +170,7 @@ struct Settings : public SettingsCollection<Settings>
|
||||
M(SettingBool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow, CSVWithNames, TSVWithNames and TSKV formats).") \
|
||||
M(SettingBool, input_format_with_names_use_header, false, "For TSVWithNames and CSVWithNames input formats this controls whether format parser is to assume that column data appear in the input exactly as they are specified in the header.") \
|
||||
M(SettingBool, input_format_import_nested_json, false, "Map nested JSON data to nested tables (it works for JSONEachRow format).") \
|
||||
M(SettingBool, input_format_defaults_for_omitted_fields, false, "For input data calculate default expressions for omitted fields (it works for JSONEachRow format).") \
|
||||
M(SettingBool, input_format_defaults_for_omitted_fields, true, "For input data calculate default expressions for omitted fields (it works for JSONEachRow format).") \
|
||||
M(SettingBool, input_format_null_as_default, false, "For CSV format initialize null fields with default values if data type of this field is not nullable") \
|
||||
\
|
||||
M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.") \
|
||||
|
@ -51,7 +51,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
|
||||
|
||||
res_stream = context.getInputFormat(format, *input_buffer_contacenated, header, context.getSettings().max_insert_block_size);
|
||||
|
||||
if (context.getSettingsRef().input_format_defaults_for_omitted_fields)
|
||||
if (context.getSettingsRef().input_format_defaults_for_omitted_fields && !ast_insert_query->table.empty())
|
||||
{
|
||||
StoragePtr storage = context.getTable(ast_insert_query->database, ast_insert_query->table);
|
||||
auto column_defaults = storage->getColumns().getDefaults();
|
||||
|
@ -50,6 +50,11 @@ RemoteBlockOutputStream::RemoteBlockOutputStream(Connection & connection_,
|
||||
if (auto log_queue = CurrentThread::getInternalTextLogsQueue())
|
||||
log_queue->pushBlock(std::move(packet.block));
|
||||
}
|
||||
else if (Protocol::Server::TableColumns == packet.type)
|
||||
{
|
||||
/// Server could attach ColumnsDescription in front of stream for column defaults. There's no need to pass it through cause
|
||||
/// client's already got this information for remote table. Ignore.
|
||||
}
|
||||
else
|
||||
throw NetException("Unexpected packet from server (expected Data or Exception, got "
|
||||
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
|
||||
|
@ -40,8 +40,10 @@ TTLBlockInputStream::TTLBlockInputStream(
|
||||
auto it = column_defaults.find(name);
|
||||
|
||||
if (it != column_defaults.end())
|
||||
default_expr_list->children.emplace_back(
|
||||
setAlias(it->second.expression, it->first));
|
||||
{
|
||||
auto expression = it->second.expression->clone();
|
||||
default_expr_list->children.emplace_back(setAlias(expression, it->first));
|
||||
}
|
||||
}
|
||||
else
|
||||
new_ttl_infos.columns_ttl.emplace(name, ttl_info);
|
||||
|
@ -27,7 +27,7 @@ DatabaseDictionary::DatabaseDictionary(const String & name_)
|
||||
{
|
||||
}
|
||||
|
||||
void DatabaseDictionary::loadTables(Context &, ThreadPool *, bool)
|
||||
void DatabaseDictionary::loadTables(Context &, bool)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,6 @@ public:
|
||||
|
||||
void loadTables(
|
||||
Context & context,
|
||||
ThreadPool * thread_pool,
|
||||
bool has_force_restore_data_flag) override;
|
||||
|
||||
bool isTableExist(
|
||||
|
@ -18,7 +18,6 @@ DatabaseMemory::DatabaseMemory(String name_)
|
||||
|
||||
void DatabaseMemory::loadTables(
|
||||
Context & /*context*/,
|
||||
ThreadPool * /*thread_pool*/,
|
||||
bool /*has_force_restore_data_flag*/)
|
||||
{
|
||||
/// Nothing to load.
|
||||
|
@ -25,7 +25,6 @@ public:
|
||||
|
||||
void loadTables(
|
||||
Context & context,
|
||||
ThreadPool * thread_pool,
|
||||
bool has_force_restore_data_flag) override;
|
||||
|
||||
void createTable(
|
||||
|
@ -46,7 +46,7 @@ public:
|
||||
throw Exception("MySQL database engine does not support detach table.", ErrorCodes::NOT_IMPLEMENTED);
|
||||
}
|
||||
|
||||
void loadTables(Context &, ThreadPool *, bool) override
|
||||
void loadTables(Context &, bool) override
|
||||
{
|
||||
/// do nothing
|
||||
}
|
||||
|
@ -119,7 +119,6 @@ DatabaseOrdinary::DatabaseOrdinary(String name_, const String & metadata_path_,
|
||||
|
||||
void DatabaseOrdinary::loadTables(
|
||||
Context & context,
|
||||
ThreadPool * thread_pool,
|
||||
bool has_force_restore_data_flag)
|
||||
{
|
||||
using FileNames = std::vector<std::string>;
|
||||
@ -161,96 +160,68 @@ void DatabaseOrdinary::loadTables(
|
||||
*/
|
||||
std::sort(file_names.begin(), file_names.end());
|
||||
|
||||
size_t total_tables = file_names.size();
|
||||
const size_t total_tables = file_names.size();
|
||||
LOG_INFO(log, "Total " << total_tables << " tables.");
|
||||
|
||||
AtomicStopwatch watch;
|
||||
std::atomic<size_t> tables_processed {0};
|
||||
Poco::Event all_tables_processed;
|
||||
ExceptionHandler exception_handler;
|
||||
|
||||
auto task_function = [&](const String & table)
|
||||
auto loadOneTable = [&](const String & table)
|
||||
{
|
||||
SCOPE_EXIT(
|
||||
if (++tables_processed == total_tables)
|
||||
all_tables_processed.set()
|
||||
);
|
||||
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
|
||||
|
||||
/// Messages, so that it's not boring to wait for the server to load for a long time.
|
||||
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
|
||||
if (++tables_processed % PRINT_MESSAGE_EACH_N_TABLES == 0
|
||||
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
|
||||
{
|
||||
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
|
||||
watch.restart();
|
||||
}
|
||||
|
||||
loadTable(context, metadata_path, *this, name, data_path, table, has_force_restore_data_flag);
|
||||
};
|
||||
|
||||
for (const auto & filename : file_names)
|
||||
{
|
||||
auto task = createExceptionHandledJob(std::bind(task_function, filename), exception_handler);
|
||||
ThreadPool pool(SettingMaxThreads().getAutoValue());
|
||||
|
||||
if (thread_pool)
|
||||
thread_pool->schedule(task);
|
||||
else
|
||||
task();
|
||||
for (const auto & file_name : file_names)
|
||||
{
|
||||
pool.schedule([&]() { loadOneTable(file_name); });
|
||||
}
|
||||
|
||||
if (thread_pool)
|
||||
all_tables_processed.wait();
|
||||
|
||||
exception_handler.throwIfException();
|
||||
pool.wait();
|
||||
|
||||
/// After all tables was basically initialized, startup them.
|
||||
startupTables(thread_pool);
|
||||
startupTables(pool);
|
||||
}
|
||||
|
||||
|
||||
void DatabaseOrdinary::startupTables(ThreadPool * thread_pool)
|
||||
void DatabaseOrdinary::startupTables(ThreadPool & thread_pool)
|
||||
{
|
||||
LOG_INFO(log, "Starting up tables.");
|
||||
|
||||
AtomicStopwatch watch;
|
||||
std::atomic<size_t> tables_processed {0};
|
||||
size_t total_tables = tables.size();
|
||||
Poco::Event all_tables_processed;
|
||||
ExceptionHandler exception_handler;
|
||||
|
||||
const size_t total_tables = tables.size();
|
||||
if (!total_tables)
|
||||
return;
|
||||
|
||||
auto task_function = [&](const StoragePtr & table)
|
||||
{
|
||||
SCOPE_EXIT(
|
||||
if (++tables_processed == total_tables)
|
||||
all_tables_processed.set()
|
||||
);
|
||||
AtomicStopwatch watch;
|
||||
std::atomic<size_t> tables_processed {0};
|
||||
|
||||
if ((tables_processed + 1) % PRINT_MESSAGE_EACH_N_TABLES == 0
|
||||
auto startupOneTable = [&](const StoragePtr & table)
|
||||
{
|
||||
table->startup();
|
||||
|
||||
if (++tables_processed % PRINT_MESSAGE_EACH_N_TABLES == 0
|
||||
|| watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
|
||||
{
|
||||
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
|
||||
watch.restart();
|
||||
}
|
||||
|
||||
table->startup();
|
||||
};
|
||||
|
||||
for (const auto & name_storage : tables)
|
||||
for (const auto & table : tables)
|
||||
{
|
||||
auto task = createExceptionHandledJob(std::bind(task_function, name_storage.second), exception_handler);
|
||||
|
||||
if (thread_pool)
|
||||
thread_pool->schedule(task);
|
||||
else
|
||||
task();
|
||||
thread_pool.schedule([&]() { startupOneTable(table.second); });
|
||||
}
|
||||
|
||||
if (thread_pool)
|
||||
all_tables_processed.wait();
|
||||
|
||||
exception_handler.throwIfException();
|
||||
thread_pool.wait();
|
||||
}
|
||||
|
||||
|
||||
|
@ -19,7 +19,6 @@ public:
|
||||
|
||||
void loadTables(
|
||||
Context & context,
|
||||
ThreadPool * thread_pool,
|
||||
bool has_force_restore_data_flag) override;
|
||||
|
||||
void createTable(
|
||||
@ -73,7 +72,7 @@ private:
|
||||
const String data_path;
|
||||
Poco::Logger * log;
|
||||
|
||||
void startupTables(ThreadPool * thread_pool);
|
||||
void startupTables(ThreadPool & thread_pool);
|
||||
|
||||
ASTPtr getCreateTableQueryImpl(const Context & context, const String & table_name, bool throw_on_error) const;
|
||||
};
|
||||
|
@ -56,11 +56,10 @@ public:
|
||||
/// Get name of database engine.
|
||||
virtual String getEngineName() const = 0;
|
||||
|
||||
/// Load a set of existing tables. If thread_pool is specified, use it.
|
||||
/// Load a set of existing tables.
|
||||
/// You can call only once, right after the object is created.
|
||||
virtual void loadTables(
|
||||
Context & context,
|
||||
ThreadPool * thread_pool,
|
||||
bool has_force_restore_data_flag) = 0;
|
||||
|
||||
/// Check the existence of the table.
|
||||
|
@ -41,11 +41,12 @@ namespace
|
||||
constexpr UInt64 END_OF_GROUP = static_cast<UInt64>(-2);
|
||||
|
||||
Int64 decodeZigZag(UInt64 n) { return static_cast<Int64>((n >> 1) ^ (~(n & 1) + 1)); }
|
||||
}
|
||||
|
||||
[[noreturn]] void unknownFormat()
|
||||
{
|
||||
throw Exception("Protobuf messages are corrupted or don't match the provided schema. Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint.", ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
|
||||
}
|
||||
|
||||
[[noreturn]] void ProtobufReader::SimpleReader::throwUnknownFormat()
|
||||
{
|
||||
throw Exception("Protobuf messages are corrupted or don't match the provided schema. Please note that Protobuf stream is length-delimited: every message is prefixed by its length in varint.", ErrorCodes::UNKNOWN_PROTOBUF_FORMAT);
|
||||
}
|
||||
|
||||
|
||||
@ -67,7 +68,10 @@ bool ProtobufReader::SimpleReader::startMessage()
|
||||
if (unlikely(in.eof()))
|
||||
return false;
|
||||
size_t size_of_message = readVarint();
|
||||
if (size_of_message == 0)
|
||||
throwUnknownFormat();
|
||||
current_message_end = cursor + size_of_message;
|
||||
root_message_end = current_message_end;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -91,7 +95,7 @@ void ProtobufReader::SimpleReader::endMessage()
|
||||
else if (unlikely(cursor > current_message_end))
|
||||
{
|
||||
if (!parent_message_ends.empty())
|
||||
unknownFormat();
|
||||
throwUnknownFormat();
|
||||
moveCursorBackward(cursor - current_message_end);
|
||||
}
|
||||
current_message_end = REACHED_END;
|
||||
@ -141,7 +145,7 @@ bool ProtobufReader::SimpleReader::readFieldNumber(UInt32 & field_number)
|
||||
|
||||
UInt64 varint = readVarint();
|
||||
if (unlikely(varint & (static_cast<UInt64>(0xFFFFFFFF) << 32)))
|
||||
unknownFormat();
|
||||
throwUnknownFormat();
|
||||
UInt32 key = static_cast<UInt32>(varint);
|
||||
field_number = (key >> 3);
|
||||
WireType wire_type = static_cast<WireType>(key & 0x07);
|
||||
@ -171,7 +175,7 @@ bool ProtobufReader::SimpleReader::readFieldNumber(UInt32 & field_number)
|
||||
case GROUP_END:
|
||||
{
|
||||
if (current_message_end != END_OF_GROUP)
|
||||
unknownFormat();
|
||||
throwUnknownFormat();
|
||||
current_message_end = REACHED_END;
|
||||
return false;
|
||||
}
|
||||
@ -181,7 +185,7 @@ bool ProtobufReader::SimpleReader::readFieldNumber(UInt32 & field_number)
|
||||
return true;
|
||||
}
|
||||
}
|
||||
unknownFormat();
|
||||
throwUnknownFormat();
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
@ -257,7 +261,7 @@ void ProtobufReader::SimpleReader::ignore(UInt64 num_bytes)
|
||||
void ProtobufReader::SimpleReader::moveCursorBackward(UInt64 num_bytes)
|
||||
{
|
||||
if (in.offset() < num_bytes)
|
||||
unknownFormat();
|
||||
throwUnknownFormat();
|
||||
in.position() -= num_bytes;
|
||||
cursor -= num_bytes;
|
||||
}
|
||||
@ -294,7 +298,7 @@ UInt64 ProtobufReader::SimpleReader::continueReadingVarint(UInt64 first_byte)
|
||||
PROTOBUF_READER_READ_VARINT_BYTE(10)
|
||||
#undef PROTOBUF_READER_READ_VARINT_BYTE
|
||||
|
||||
unknownFormat();
|
||||
throwUnknownFormat();
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
@ -327,7 +331,7 @@ void ProtobufReader::SimpleReader::ignoreVarint()
|
||||
PROTOBUF_READER_IGNORE_VARINT_BYTE(10)
|
||||
#undef PROTOBUF_READER_IGNORE_VARINT_BYTE
|
||||
|
||||
unknownFormat();
|
||||
throwUnknownFormat();
|
||||
}
|
||||
|
||||
void ProtobufReader::SimpleReader::ignoreGroup()
|
||||
@ -371,11 +375,10 @@ void ProtobufReader::SimpleReader::ignoreGroup()
|
||||
break;
|
||||
}
|
||||
}
|
||||
unknownFormat();
|
||||
throwUnknownFormat();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Implementation for a converter from any protobuf field type to any DB data type.
|
||||
class ProtobufReader::ConverterBaseImpl : public ProtobufReader::IConverter
|
||||
{
|
||||
|
@ -97,10 +97,19 @@ private:
|
||||
bool readUInt(UInt64 & value);
|
||||
template<typename T> bool readFixed(T & value);
|
||||
bool readStringInto(PaddedPODArray<UInt8> & str);
|
||||
bool ALWAYS_INLINE maybeCanReadValue() const { return field_end != REACHED_END; }
|
||||
|
||||
bool ALWAYS_INLINE maybeCanReadValue() const
|
||||
{
|
||||
if (field_end == REACHED_END)
|
||||
return false;
|
||||
if (cursor < root_message_end)
|
||||
return true;
|
||||
|
||||
throwUnknownFormat();
|
||||
}
|
||||
|
||||
private:
|
||||
void readBinary(void* data, size_t size);
|
||||
void readBinary(void * data, size_t size);
|
||||
void ignore(UInt64 num_bytes);
|
||||
void moveCursorBackward(UInt64 num_bytes);
|
||||
|
||||
@ -119,6 +128,8 @@ private:
|
||||
void ignoreVarint();
|
||||
void ignoreGroup();
|
||||
|
||||
[[noreturn]] static void throwUnknownFormat();
|
||||
|
||||
static constexpr UInt64 REACHED_END = 0;
|
||||
|
||||
ReadBuffer & in;
|
||||
@ -126,6 +137,8 @@ private:
|
||||
std::vector<UInt64> parent_message_ends;
|
||||
UInt64 current_message_end;
|
||||
UInt64 field_end;
|
||||
|
||||
UInt64 root_message_end;
|
||||
};
|
||||
|
||||
class IConverter
|
||||
|
@ -1548,13 +1548,15 @@ public:
|
||||
ColumnString::Chars & vec_res_upper_range = col_res_upper_range->getChars();
|
||||
vec_res_upper_range.resize(input_rows_count * IPV6_BINARY_LENGTH);
|
||||
|
||||
static constexpr UInt8 max_cidr_mask = IPV6_BINARY_LENGTH * 8;
|
||||
|
||||
for (size_t offset = 0; offset < input_rows_count; ++offset)
|
||||
{
|
||||
const size_t offset_ipv6 = offset * IPV6_BINARY_LENGTH;
|
||||
UInt8 cidr = col_const_cidr_in
|
||||
? col_const_cidr_in->getValue<UInt8>()
|
||||
: col_cidr_in->getData()[offset];
|
||||
|
||||
cidr = std::min(cidr, max_cidr_mask);
|
||||
applyCIDRMask(&vec_in[offset_ipv6], &vec_res_lower_range[offset_ipv6], &vec_res_upper_range[offset_ipv6], cidr);
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,7 @@ HDFSBuilderPtr createHDFSBuilder(const Poco::URI & uri)
|
||||
auto & host = uri.getHost();
|
||||
auto port = uri.getPort();
|
||||
auto & path = uri.getPath();
|
||||
if (host.empty() || port == 0 || path.empty())
|
||||
if (host.empty() || path.empty())
|
||||
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
|
||||
|
||||
HDFSBuilderPtr builder(hdfsNewBuilder());
|
||||
|
@ -159,7 +159,7 @@ BlockIO InterpreterCreateQuery::createDatabase(ASTCreateQuery & create)
|
||||
if (need_write_metadata)
|
||||
Poco::File(metadata_file_tmp_path).renameTo(metadata_file_path);
|
||||
|
||||
database->loadTables(context, thread_pool, has_force_restore_data_flag);
|
||||
database->loadTables(context, has_force_restore_data_flag);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
|
@ -31,11 +31,6 @@ public:
|
||||
|
||||
static ASTPtr formatIndices(const IndicesDescription & indices);
|
||||
|
||||
void setDatabaseLoadingThreadpool(ThreadPool & thread_pool_)
|
||||
{
|
||||
thread_pool = &thread_pool_;
|
||||
}
|
||||
|
||||
void setForceRestoreData(bool has_force_restore_data_flag_)
|
||||
{
|
||||
has_force_restore_data_flag = has_force_restore_data_flag_;
|
||||
@ -61,9 +56,6 @@ private:
|
||||
ASTPtr query_ptr;
|
||||
Context & context;
|
||||
|
||||
/// Using while loading database.
|
||||
ThreadPool * thread_pool = nullptr;
|
||||
|
||||
/// Skip safety threshold when loading tables.
|
||||
bool has_force_restore_data_flag = false;
|
||||
/// Is this an internal query - not from the user.
|
||||
|
@ -247,7 +247,12 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
res = interpreter->execute();
|
||||
|
||||
if (auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
|
||||
context.setInsertionTable(insert_interpreter->getDatabaseTable());
|
||||
{
|
||||
/// Save insertion table (not table function). TODO: support remote() table function.
|
||||
auto db_table = insert_interpreter->getDatabaseTable();
|
||||
if (!db_table.second.empty())
|
||||
context.setInsertionTable(std::move(db_table));
|
||||
}
|
||||
|
||||
if (process_list_entry)
|
||||
{
|
||||
|
@ -33,7 +33,6 @@ static void executeCreateQuery(
|
||||
Context & context,
|
||||
const String & database,
|
||||
const String & file_name,
|
||||
ThreadPool * pool,
|
||||
bool has_force_restore_data_flag)
|
||||
{
|
||||
ParserCreateQuery parser;
|
||||
@ -45,8 +44,6 @@ static void executeCreateQuery(
|
||||
|
||||
InterpreterCreateQuery interpreter(ast, context);
|
||||
interpreter.setInternal(true);
|
||||
if (pool)
|
||||
interpreter.setDatabaseLoadingThreadpool(*pool);
|
||||
interpreter.setForceRestoreData(has_force_restore_data_flag);
|
||||
interpreter.execute();
|
||||
}
|
||||
@ -56,7 +53,6 @@ static void loadDatabase(
|
||||
Context & context,
|
||||
const String & database,
|
||||
const String & database_path,
|
||||
ThreadPool * thread_pool,
|
||||
bool force_restore_data)
|
||||
{
|
||||
/// There may exist .sql file with database creation statement.
|
||||
@ -73,7 +69,8 @@ static void loadDatabase(
|
||||
else
|
||||
database_attach_query = "ATTACH DATABASE " + backQuoteIfNeed(database);
|
||||
|
||||
executeCreateQuery(database_attach_query, context, database, database_metadata_file, thread_pool, force_restore_data);
|
||||
executeCreateQuery(database_attach_query, context, database,
|
||||
database_metadata_file, force_restore_data);
|
||||
}
|
||||
|
||||
|
||||
@ -92,9 +89,6 @@ void loadMetadata(Context & context)
|
||||
Poco::File force_restore_data_flag_file(context.getFlagsPath() + "force_restore_data");
|
||||
bool has_force_restore_data_flag = force_restore_data_flag_file.exists();
|
||||
|
||||
/// For parallel tables loading.
|
||||
ThreadPool thread_pool(SettingMaxThreads().getAutoValue());
|
||||
|
||||
/// Loop over databases.
|
||||
std::map<String, String> databases;
|
||||
Poco::DirectoryIterator dir_end;
|
||||
@ -113,10 +107,8 @@ void loadMetadata(Context & context)
|
||||
databases.emplace(unescapeForFileName(it.name()), it.path().toString());
|
||||
}
|
||||
|
||||
for (const auto & elem : databases)
|
||||
loadDatabase(context, elem.first, elem.second, &thread_pool, has_force_restore_data_flag);
|
||||
|
||||
thread_pool.wait();
|
||||
for (const auto & [name, path] : databases)
|
||||
loadDatabase(context, name, path, has_force_restore_data_flag);
|
||||
|
||||
if (has_force_restore_data_flag)
|
||||
{
|
||||
@ -138,7 +130,7 @@ void loadMetadataSystem(Context & context)
|
||||
if (Poco::File(path).exists())
|
||||
{
|
||||
/// 'has_force_restore_data_flag' is true, to not fail on loading query_log table, if it is corrupted.
|
||||
loadDatabase(context, SYSTEM_DATABASE, path, nullptr, true);
|
||||
loadDatabase(context, SYSTEM_DATABASE, path, true);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -84,7 +84,7 @@ try
|
||||
context.setPath("./");
|
||||
auto database = std::make_shared<DatabaseOrdinary>("test", "./metadata/test/", context);
|
||||
context.addDatabase("test", database);
|
||||
database->loadTables(context, nullptr, false);
|
||||
database->loadTables(context, false);
|
||||
context.setCurrentDatabase("test");
|
||||
|
||||
InterpreterCreateQuery interpreter(ast, context);
|
||||
|
@ -39,7 +39,7 @@ try
|
||||
|
||||
DatabasePtr system = std::make_shared<DatabaseOrdinary>("system", "./metadata/system/", context);
|
||||
context.addDatabase("system", system);
|
||||
system->loadTables(context, nullptr, false);
|
||||
system->loadTables(context, false);
|
||||
attachSystemTablesLocal(*context.getDatabase("system"));
|
||||
context.setCurrentDatabase("default");
|
||||
|
||||
|
@ -15,3 +15,4 @@ ffff:: 4 ('f000::','ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')
|
||||
('ffff:ffff:ffff:ffff::','ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')
|
||||
('::','ff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')
|
||||
('f000::','ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')
|
||||
1
|
||||
|
@ -22,3 +22,4 @@ SELECT IPv6CIDRToRange(IPv6StringToNum('2001:0db8:0000:85a3:0000:0000:ac1f:8001'
|
||||
SELECT IPv6CIDRToRange(IPv6StringToNum('ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'), 64);
|
||||
SELECT IPv6CIDRToRange(IPv6StringToNum('0000:0000:0000:0000:0000:0000:0000:0000'), 8);
|
||||
SELECT IPv6CIDRToRange(IPv6StringToNum('ffff:0000:0000:0000:0000:0000:0000:0000'), 4);
|
||||
SELECT IPv6CIDRToRange(IPv6StringToNum('2001:0db8:0000:85a3:0000:0000:ac1f:8001'), 128) = IPv6CIDRToRange(IPv6StringToNum('2001:0db8:0000:85a3:0000:0000:ac1f:8001'), 200) ;
|
||||
|
@ -0,0 +1,2 @@
|
||||
80041
|
||||
80041
|
133
dbms/tests/queries/0_stateless/00973_uniq_non_associativity.sql
Normal file
133
dbms/tests/queries/0_stateless/00973_uniq_non_associativity.sql
Normal file
@ -0,0 +1,133 @@
|
||||
/* Aggregate function 'uniq' is intended to be associative and provide deterministic results regardless to the schedule of query execution threads and remote servers in a cluster.
|
||||
* But due to subtle bug in implementation it is not associative in very rare cases.
|
||||
* In this test we fill data structure with specific pattern that reproduces this behaviour.
|
||||
*/
|
||||
|
||||
DROP TABLE IF EXISTS part_a;
|
||||
DROP TABLE IF EXISTS part_b;
|
||||
DROP TABLE IF EXISTS part_c;
|
||||
DROP TABLE IF EXISTS part_d;
|
||||
|
||||
/* Create values that will resize hash table to the maximum (131072 cells) and fill it with less than max_fill (65536 cells)
|
||||
* and occupy cells near the end except last 10 cells:
|
||||
* [ ----------- ]
|
||||
* Pick values that will vanish if table will be rehashed.
|
||||
*/
|
||||
CREATE TABLE part_a ENGINE = TinyLog AS SELECT * FROM
|
||||
(
|
||||
WITH
|
||||
number AS k1,
|
||||
bitXor(k1, bitShiftRight(k1, 33)) AS k2,
|
||||
k2 * 0xff51afd7ed558ccd AS k3,
|
||||
bitXor(k3, bitShiftRight(k3, 33)) AS k4,
|
||||
k4 * 0xc4ceb9fe1a85ec53 AS k5,
|
||||
bitXor(k5, bitShiftRight(k5, 33)) AS k6,
|
||||
k6 AS hash,
|
||||
bitShiftRight(hash, 15) % 0x20000 AS place,
|
||||
hash % 2 = 0 AS will_remain
|
||||
SELECT hash, number, place FROM system.numbers WHERE place >= 90000 AND place < 131062 AND NOT will_remain LIMIT 1 BY place LIMIT 41062
|
||||
) ORDER BY place;
|
||||
|
||||
/* Create values that will resize hash table to the maximum (131072 cells) and fill it with less than max_fill (65536 cells),
|
||||
* but if we use both "a" and "b", it will force rehash.
|
||||
* [ ----------- ]
|
||||
* Pick values that will remain after rehash.
|
||||
*/
|
||||
CREATE TABLE part_b ENGINE = TinyLog AS SELECT * FROM
|
||||
(
|
||||
WITH
|
||||
number AS k1,
|
||||
bitXor(k1, bitShiftRight(k1, 33)) AS k2,
|
||||
k2 * 0xff51afd7ed558ccd AS k3,
|
||||
bitXor(k3, bitShiftRight(k3, 33)) AS k4,
|
||||
k4 * 0xc4ceb9fe1a85ec53 AS k5,
|
||||
bitXor(k5, bitShiftRight(k5, 33)) AS k6,
|
||||
k6 AS hash,
|
||||
bitShiftRight(hash, 15) % 0x20000 AS place,
|
||||
hash % 2 = 0 AS will_remain
|
||||
SELECT hash, number, place FROM system.numbers WHERE place >= 50000 AND place < 90000 AND will_remain LIMIT 1 BY place LIMIT 40000
|
||||
) ORDER BY place;
|
||||
|
||||
/* Occupy 10 cells near the end of "a":
|
||||
* a: [ ----------- ]
|
||||
* c: [ -- ]
|
||||
* If we insert "a" then "c", these values will be placed at the end of hash table due to collision resolution:
|
||||
* a + c: [ aaaaaaaaaaacc]
|
||||
*/
|
||||
CREATE TABLE part_c ENGINE = TinyLog AS SELECT * FROM
|
||||
(
|
||||
WITH
|
||||
number AS k1,
|
||||
bitXor(k1, bitShiftRight(k1, 33)) AS k2,
|
||||
k2 * 0xff51afd7ed558ccd AS k3,
|
||||
bitXor(k3, bitShiftRight(k3, 33)) AS k4,
|
||||
k4 * 0xc4ceb9fe1a85ec53 AS k5,
|
||||
bitXor(k5, bitShiftRight(k5, 33)) AS k6,
|
||||
k6 AS hash,
|
||||
bitShiftRight(hash, 15) % 0x20000 AS place,
|
||||
hash % 2 = 0 AS will_remain
|
||||
SELECT hash, number, place FROM system.numbers WHERE place >= 131052 AND place < 131062 AND will_remain AND hash NOT IN (SELECT hash FROM part_a) LIMIT 1 BY place LIMIT 10
|
||||
) ORDER BY place;
|
||||
|
||||
/* Occupy 10 cells at the end of hash table, after "a":
|
||||
* a: [ ----------- ]
|
||||
* d: [ --]
|
||||
* a + d: [ aaaaaaaaaaadd]
|
||||
* But if we insert "a" then "c" then "d", these values will be placed at the beginning of the hash table due to collision resolution:
|
||||
* a+c+d: [dd aaaaaaaaaaacc]
|
||||
*/
|
||||
CREATE TABLE part_d ENGINE = TinyLog AS SELECT * FROM
|
||||
(
|
||||
WITH
|
||||
number AS k1,
|
||||
bitXor(k1, bitShiftRight(k1, 33)) AS k2,
|
||||
k2 * 0xff51afd7ed558ccd AS k3,
|
||||
bitXor(k3, bitShiftRight(k3, 33)) AS k4,
|
||||
k4 * 0xc4ceb9fe1a85ec53 AS k5,
|
||||
bitXor(k5, bitShiftRight(k5, 33)) AS k6,
|
||||
k6 AS hash,
|
||||
bitShiftRight(hash, 15) % 0x20000 AS place,
|
||||
hash % 2 = 0 AS will_remain
|
||||
SELECT hash, number, place FROM system.numbers WHERE place >= 131062 AND will_remain LIMIT 1 BY place LIMIT 10
|
||||
) ORDER BY place;
|
||||
|
||||
/** What happens if we insert a then c then d then b?
|
||||
* Insertion of b forces rehash.
|
||||
* a will be removed, but c, d, b remain:
|
||||
* [dd bbbbbbbbbb cc]
|
||||
* Then we go through hash table and move elements to better places in collision resolution chain.
|
||||
* c will be moved left to their right place:
|
||||
* [dd bbbbbbbbbb cc ]
|
||||
*
|
||||
* And d must be moved also:
|
||||
* [ bbbbbbbbbb ccdd]
|
||||
* But our algorithm was incorrect and it doesn't happen.
|
||||
*
|
||||
* If we insert d again, it will be placed twice because original d will not found:
|
||||
* [dd bbbbbbbbbb ccdd]
|
||||
* This will lead to slightly higher return value of "uniq" aggregate function and it is dependent on insertion order.
|
||||
*/
|
||||
|
||||
|
||||
SET max_threads = 1;
|
||||
|
||||
/** Results of these two queries must match: */
|
||||
|
||||
SELECT uniq(number) FROM (
|
||||
SELECT * FROM part_a
|
||||
UNION ALL SELECT * FROM part_c
|
||||
UNION ALL SELECT * FROM part_d
|
||||
UNION ALL SELECT * FROM part_b);
|
||||
|
||||
SELECT uniq(number) FROM (
|
||||
SELECT * FROM part_a
|
||||
UNION ALL SELECT * FROM part_c
|
||||
UNION ALL SELECT * FROM part_d
|
||||
UNION ALL SELECT * FROM part_b
|
||||
UNION ALL SELECT * FROM part_d);
|
||||
|
||||
|
||||
DROP TABLE part_a;
|
||||
DROP TABLE part_b;
|
||||
DROP TABLE part_c;
|
||||
DROP TABLE part_d;
|
@ -84,7 +84,7 @@ ClickHouse поддерживает следующие виды ключей:
|
||||
При запросе в функции `dictGet*` в качестве ключа передаётся кортеж. Пример: `dictGetString('dict_name', 'attr_name', tuple('string for field1', num_for_field2))`.
|
||||
|
||||
|
||||
## Атрибуты
|
||||
## Атрибуты {#ext_dict_structure-attributes}
|
||||
|
||||
Пример конфигурации:
|
||||
|
||||
|
@ -65,7 +65,7 @@ INSERT INTO t VALUES (1, 'Hello, world'), (2, 'abc'), (3, 'def')
|
||||
|
||||
Примеры: `1`, `18446744073709551615`, `0xDEADBEEF`, `01`, `0.1`, `1e100`, `-1e-100`, `inf`, `nan`.
|
||||
|
||||
### Строковые
|
||||
### Строковые {#syntax-string-literal}
|
||||
|
||||
Поддерживаются только строковые литералы в одинарных кавычках. Символы внутри могут быть экранированы с помощью обратного слеша. Следующие escape-последовательности имеют соответствующее специальное значение: `\b`, `\f`, `\r`, `\n`, `\t`, `\0`, `\a`, `\v`, `\xHH`. Во всех остальных случаях, последовательности вида `\c`, где `c` — любой символ, преобразуется в `c` . Таким образом, могут быть использованы последовательности `\'` и `\\`. Значение будет иметь тип [String](../data_types/string.md).
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user