mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-24 18:50:49 +00:00
Merge branch 'master' into update-openssl
This commit is contained in:
commit
2d69c96876
@ -26,6 +26,11 @@ if (ENABLE_REPLXX)
|
||||
)
|
||||
endif ()
|
||||
|
||||
if (USE_DEBUG_HELPERS)
|
||||
set (INCLUDE_DEBUG_HELPERS "-include ${ClickHouse_SOURCE_DIR}/base/common/iostream_debug_helpers.h")
|
||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${INCLUDE_DEBUG_HELPERS}")
|
||||
endif ()
|
||||
|
||||
add_library (common ${SRCS})
|
||||
|
||||
target_include_directories(common PUBLIC .. ${CMAKE_CURRENT_BINARY_DIR}/..)
|
||||
|
@ -4,7 +4,7 @@ endif()
|
||||
|
||||
if (ENABLE_PARQUET)
|
||||
|
||||
if (NOT OS_FREEBSD AND NOT OS_DARWIN) # Freebsd: ../contrib/arrow/cpp/src/arrow/util/bit-util.h:27:10: fatal error: endian.h: No such file or directory
|
||||
if (NOT OS_FREEBSD) # Freebsd: ../contrib/arrow/cpp/src/arrow/util/bit-util.h:27:10: fatal error: endian.h: No such file or directory
|
||||
option(USE_INTERNAL_PARQUET_LIBRARY "Set to FALSE to use system parquet library instead of bundled" ${NOT_UNBUNDLED})
|
||||
endif()
|
||||
|
||||
|
@ -36,6 +36,11 @@ if (NOT MSVC)
|
||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wextra")
|
||||
endif ()
|
||||
|
||||
if (USE_DEBUG_HELPERS)
|
||||
set (INCLUDE_DEBUG_HELPERS "-I${ClickHouse_SOURCE_DIR}/base -include ${ClickHouse_SOURCE_DIR}/dbms/src/Core/iostream_debug_helpers.h")
|
||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${INCLUDE_DEBUG_HELPERS}")
|
||||
endif ()
|
||||
|
||||
# Add some warnings that are not available even with -Wall -Wextra -Wpedantic.
|
||||
|
||||
option (WEVERYTHING "Enables -Weverything option with some exceptions. This is intended for exploration of new compiler warnings that may be found to be useful. Only makes sense for clang." ON)
|
||||
|
@ -992,9 +992,9 @@ public:
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
|
||||
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) override
|
||||
{
|
||||
/// Special case when multiply aggregate function state
|
||||
if (isAggregateMultiply(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
|
||||
|
@ -24,7 +24,6 @@
|
||||
|
||||
#include <Interpreters/castColumn.h>
|
||||
|
||||
#include <Functions/FunctionsLogical.h>
|
||||
#include <Functions/IFunctionAdaptors.h>
|
||||
#include <Functions/FunctionHelpers.h>
|
||||
|
||||
@ -37,6 +36,15 @@
|
||||
#include <limits>
|
||||
#include <type_traits>
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
#include <DataTypes/Native.h>
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic ignored "-Wunused-parameter"
|
||||
#include <llvm/IR/IRBuilder.h>
|
||||
#pragma GCC diagnostic pop
|
||||
#endif
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -957,26 +965,26 @@ private:
|
||||
const ColumnsWithTypeAndName & y, size_t tuple_size,
|
||||
size_t input_rows_count);
|
||||
|
||||
template <typename ComparisonFunction, typename ConvolutionFunction>
|
||||
void executeTupleEqualityImpl(Block & block, size_t result, const ColumnsWithTypeAndName & x, const ColumnsWithTypeAndName & y,
|
||||
size_t tuple_size, size_t input_rows_count)
|
||||
void executeTupleEqualityImpl(
|
||||
std::shared_ptr<IFunctionOverloadResolver> func_compare,
|
||||
std::shared_ptr<IFunctionOverloadResolver> func_convolution,
|
||||
Block & block,
|
||||
size_t result,
|
||||
const ColumnsWithTypeAndName & x,
|
||||
const ColumnsWithTypeAndName & y,
|
||||
size_t tuple_size,
|
||||
size_t input_rows_count)
|
||||
{
|
||||
if (0 == tuple_size)
|
||||
throw Exception("Comparison of zero-sized tuples is not implemented.", ErrorCodes::NOT_IMPLEMENTED);
|
||||
|
||||
auto func_compare = ComparisonFunction::create(context);
|
||||
auto func_convolution = ConvolutionFunction::create(context);
|
||||
|
||||
auto func_compare_adaptor = FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(func_compare));
|
||||
auto func_convolution_adaptor = FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(func_convolution));
|
||||
|
||||
Block tmp_block;
|
||||
for (size_t i = 0; i < tuple_size; ++i)
|
||||
{
|
||||
tmp_block.insert(x[i]);
|
||||
tmp_block.insert(y[i]);
|
||||
|
||||
auto impl = func_compare_adaptor.build({x[i], y[i]});
|
||||
auto impl = func_compare->build({x[i], y[i]});
|
||||
|
||||
/// Comparison of the elements.
|
||||
tmp_block.insert({ nullptr, std::make_shared<DataTypeUInt8>(), "" });
|
||||
@ -998,34 +1006,30 @@ private:
|
||||
convolution_args[i] = i * 3 + 2;
|
||||
|
||||
ColumnsWithTypeAndName convolution_types(convolution_args.size(), { nullptr, std::make_shared<DataTypeUInt8>(), "" });
|
||||
auto impl = func_convolution_adaptor.build(convolution_types);
|
||||
auto impl = func_convolution->build(convolution_types);
|
||||
|
||||
impl->execute(tmp_block, convolution_args, tuple_size * 3, input_rows_count);
|
||||
block.getByPosition(result).column = tmp_block.getByPosition(tuple_size * 3).column;
|
||||
}
|
||||
|
||||
template <typename HeadComparisonFunction, typename TailComparisonFunction>
|
||||
void executeTupleLessGreaterImpl(Block & block, size_t result, const ColumnsWithTypeAndName & x,
|
||||
const ColumnsWithTypeAndName & y, size_t tuple_size, size_t input_rows_count)
|
||||
void executeTupleLessGreaterImpl(
|
||||
std::shared_ptr<IFunctionOverloadResolver> func_compare_head,
|
||||
std::shared_ptr<IFunctionOverloadResolver> func_compare_tail,
|
||||
std::shared_ptr<IFunctionOverloadResolver> func_and,
|
||||
std::shared_ptr<IFunctionOverloadResolver> func_or,
|
||||
std::shared_ptr<IFunctionOverloadResolver> func_equals,
|
||||
Block & block,
|
||||
size_t result,
|
||||
const ColumnsWithTypeAndName & x,
|
||||
const ColumnsWithTypeAndName & y,
|
||||
size_t tuple_size,
|
||||
size_t input_rows_count)
|
||||
{
|
||||
auto func_compare_head = HeadComparisonFunction::create(context);
|
||||
auto func_compare_tail = TailComparisonFunction::create(context);
|
||||
auto func_and = FunctionAnd::create(context);
|
||||
auto func_or = FunctionOr::create(context);
|
||||
auto func_equals = FunctionComparison<EqualsOp, NameEquals>::create(context);
|
||||
|
||||
auto func_compare_head_adaptor = FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(func_compare_head));
|
||||
auto func_compare_tail_adaptor = FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(func_compare_tail));
|
||||
auto func_equals_adaptor = FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(func_equals));
|
||||
|
||||
ColumnsWithTypeAndName bin_args = {{ nullptr, std::make_shared<DataTypeUInt8>(), "" },
|
||||
{ nullptr, std::make_shared<DataTypeUInt8>(), "" }};
|
||||
|
||||
auto func_and_adaptor = FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(func_and))
|
||||
.build(bin_args);
|
||||
|
||||
auto func_or_adaptor = FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(func_or))
|
||||
.build(bin_args);
|
||||
auto func_and_adaptor = func_and->build(bin_args);
|
||||
auto func_or_adaptor = func_or->build(bin_args);
|
||||
|
||||
Block tmp_block;
|
||||
|
||||
@ -1039,18 +1043,18 @@ private:
|
||||
|
||||
if (i + 1 != tuple_size)
|
||||
{
|
||||
auto impl_head = func_compare_head_adaptor.build({x[i], y[i]});
|
||||
auto impl_head = func_compare_head->build({x[i], y[i]});
|
||||
impl_head->execute(tmp_block, {i * 4, i * 4 + 1}, i * 4 + 2, input_rows_count);
|
||||
|
||||
tmp_block.insert({ nullptr, std::make_shared<DataTypeUInt8>(), "" });
|
||||
|
||||
auto impl_equals = func_equals_adaptor.build({x[i], y[i]});
|
||||
auto impl_equals = func_equals->build({x[i], y[i]});
|
||||
impl_equals->execute(tmp_block, {i * 4, i * 4 + 1}, i * 4 + 3, input_rows_count);
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
auto impl_tail = func_compare_tail_adaptor.build({x[i], y[i]});
|
||||
auto impl_tail = func_compare_tail->build({x[i], y[i]});
|
||||
impl_tail->execute(tmp_block, {i * 4, i * 4 + 1}, i * 4 + 2, input_rows_count);
|
||||
}
|
||||
}
|
||||
@ -1157,8 +1161,7 @@ public:
|
||||
|
||||
if (left_tuple && right_tuple)
|
||||
{
|
||||
auto adaptor = FunctionOverloadResolverAdaptor(
|
||||
std::make_unique<DefaultOverloadResolver>(FunctionComparison<Op, Name>::create(context)));
|
||||
auto adaptor = FunctionOverloadResolverAdaptor(std::make_unique<DefaultOverloadResolver>(FunctionComparison<Op, Name>::create(context)));
|
||||
|
||||
size_t size = left_tuple->getElements().size();
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
|
@ -18,7 +18,10 @@ void FunctionComparison<EqualsOp, NameEquals>::executeTupleImpl(Block & block, s
|
||||
const ColumnsWithTypeAndName & y, size_t tuple_size,
|
||||
size_t input_rows_count)
|
||||
{
|
||||
return executeTupleEqualityImpl<FunctionEquals, FunctionAnd>(block, result, x, y, tuple_size, input_rows_count);
|
||||
return executeTupleEqualityImpl(
|
||||
FunctionFactory::instance().get("equals", context),
|
||||
FunctionFactory::instance().get("and", context),
|
||||
block, result, x, y, tuple_size, input_rows_count);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,7 +17,15 @@ void FunctionComparison<GreaterOp, NameGreater>::executeTupleImpl(Block & block,
|
||||
const ColumnsWithTypeAndName & y, size_t tuple_size,
|
||||
size_t input_rows_count)
|
||||
{
|
||||
return executeTupleLessGreaterImpl<FunctionGreater, FunctionGreater>(block, result, x, y, tuple_size, input_rows_count);
|
||||
auto greater = FunctionFactory::instance().get("greater", context);
|
||||
|
||||
return executeTupleLessGreaterImpl(
|
||||
greater,
|
||||
greater,
|
||||
FunctionFactory::instance().get("and", context),
|
||||
FunctionFactory::instance().get("or", context),
|
||||
FunctionFactory::instance().get("equals", context),
|
||||
block, result, x, y, tuple_size, input_rows_count);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,9 +17,13 @@ void FunctionComparison<GreaterOrEqualsOp, NameGreaterOrEquals>::executeTupleImp
|
||||
const ColumnsWithTypeAndName & y, size_t tuple_size,
|
||||
size_t input_rows_count)
|
||||
{
|
||||
return executeTupleLessGreaterImpl<
|
||||
FunctionComparison<GreaterOp, NameGreater>,
|
||||
FunctionGreaterOrEquals>(block, result, x, y, tuple_size, input_rows_count);
|
||||
return executeTupleLessGreaterImpl(
|
||||
FunctionFactory::instance().get("greater", context),
|
||||
FunctionFactory::instance().get("greaterOrEquals", context),
|
||||
FunctionFactory::instance().get("and", context),
|
||||
FunctionFactory::instance().get("or", context),
|
||||
FunctionFactory::instance().get("equals", context),
|
||||
block, result, x, y, tuple_size, input_rows_count);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,7 +17,15 @@ void FunctionComparison<LessOp, NameLess>::executeTupleImpl(Block & block, size_
|
||||
const ColumnsWithTypeAndName & y, size_t tuple_size,
|
||||
size_t input_rows_count)
|
||||
{
|
||||
return executeTupleLessGreaterImpl<FunctionLess, FunctionLess>(block, result, x, y, tuple_size, input_rows_count);
|
||||
auto less = FunctionFactory::instance().get("less", context);
|
||||
|
||||
return executeTupleLessGreaterImpl(
|
||||
less,
|
||||
less,
|
||||
FunctionFactory::instance().get("and", context),
|
||||
FunctionFactory::instance().get("or", context),
|
||||
FunctionFactory::instance().get("equals", context),
|
||||
block, result, x, y, tuple_size, input_rows_count);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,9 +17,13 @@ void FunctionComparison<LessOrEqualsOp, NameLessOrEquals>::executeTupleImpl(Bloc
|
||||
const ColumnsWithTypeAndName & y, size_t tuple_size,
|
||||
size_t input_rows_count)
|
||||
{
|
||||
return executeTupleLessGreaterImpl<
|
||||
FunctionComparison<LessOp, NameLess>,
|
||||
FunctionLessOrEquals>(block, result, x, y, tuple_size, input_rows_count);
|
||||
return executeTupleLessGreaterImpl(
|
||||
FunctionFactory::instance().get("less", context),
|
||||
FunctionFactory::instance().get("lessOrEquals", context),
|
||||
FunctionFactory::instance().get("and", context),
|
||||
FunctionFactory::instance().get("or", context),
|
||||
FunctionFactory::instance().get("equals", context),
|
||||
block, result, x, y, tuple_size, input_rows_count);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,7 +17,10 @@ void FunctionComparison<NotEqualsOp, NameNotEquals>::executeTupleImpl(Block & bl
|
||||
const ColumnsWithTypeAndName & y, size_t tuple_size,
|
||||
size_t input_rows_count)
|
||||
{
|
||||
return executeTupleEqualityImpl<FunctionNotEquals, FunctionOr>(block, result, x, y, tuple_size, input_rows_count);
|
||||
return executeTupleEqualityImpl(
|
||||
FunctionFactory::instance().get("notEquals", context),
|
||||
FunctionFactory::instance().get("or", context),
|
||||
block, result, x, y, tuple_size, input_rows_count);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -4,6 +4,11 @@ add_library(clickhouse_parsers ${clickhouse_parsers_headers} ${clickhouse_parser
|
||||
target_link_libraries(clickhouse_parsers PUBLIC clickhouse_common_io)
|
||||
target_include_directories(clickhouse_parsers PUBLIC ${DBMS_INCLUDE_DIR})
|
||||
|
||||
if (USE_DEBUG_HELPERS)
|
||||
set (INCLUDE_DEBUG_HELPERS "-I${ClickHouse_SOURCE_DIR}/base -include ${ClickHouse_SOURCE_DIR}/dbms/src/Parsers/iostream_debug_helpers.h")
|
||||
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${INCLUDE_DEBUG_HELPERS}")
|
||||
endif ()
|
||||
|
||||
if(ENABLE_TESTS)
|
||||
add_subdirectory(tests)
|
||||
endif()
|
||||
|
@ -658,11 +658,12 @@ void PipelineExecutor::executeSingleThread(size_t thread_num, size_t num_threads
|
||||
total_time_ns = total_time_watch.elapsed();
|
||||
wait_time_ns = total_time_ns - execution_time_ns - processing_time_ns;
|
||||
|
||||
LOG_TRACE(log, "Thread finished."
|
||||
<< " Total time: " << (total_time_ns / 1e9) << " sec."
|
||||
<< " Execution time: " << (execution_time_ns / 1e9) << " sec."
|
||||
<< " Processing time: " << (processing_time_ns / 1e9) << " sec."
|
||||
<< " Wait time: " << (wait_time_ns / 1e9) << "sec.");
|
||||
LOG_TRACE(log, std::fixed << std::setprecision(3)
|
||||
<< "Thread finished."
|
||||
<< " Total time: " << (total_time_ns / 1e9) << " sec."
|
||||
<< " Execution time: " << (execution_time_ns / 1e9) << " sec."
|
||||
<< " Processing time: " << (processing_time_ns / 1e9) << " sec."
|
||||
<< " Wait time: " << (wait_time_ns / 1e9) << " sec.");
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -690,14 +691,14 @@ void PipelineExecutor::executeImpl(size_t num_threads)
|
||||
bool finished_flag = false;
|
||||
|
||||
SCOPE_EXIT(
|
||||
if (!finished_flag)
|
||||
{
|
||||
finish();
|
||||
if (!finished_flag)
|
||||
{
|
||||
finish();
|
||||
|
||||
for (auto & thread : threads)
|
||||
if (thread.joinable())
|
||||
thread.join();
|
||||
}
|
||||
for (auto & thread : threads)
|
||||
if (thread.joinable())
|
||||
thread.join();
|
||||
}
|
||||
);
|
||||
|
||||
addChildlessProcessorsToStack(stack);
|
||||
|
@ -260,13 +260,6 @@ namespace DB
|
||||
throw Exception{"Error while reading " + format_name + " data: " + read_status.ToString(),
|
||||
ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||
|
||||
if (0 == table->num_rows())
|
||||
throw Exception{"Empty table in input data", ErrorCodes::EMPTY_DATA_PASSED};
|
||||
|
||||
if (header.columns() > static_cast<size_t>(table->num_columns()))
|
||||
// TODO: What if some columns were not presented? Insert NULLs? What if a column is not nullable?
|
||||
throw Exception{"Number of columns is less than the table has", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH};
|
||||
|
||||
++row_group_current;
|
||||
|
||||
NameToColumnPtr name_to_column_ptr;
|
||||
|
@ -4,91 +4,171 @@
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <IO/BufferBase.h>
|
||||
#include <IO/ReadBufferFromMemory.h>
|
||||
#include <IO/ReadBufferFromFileDescriptor.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/copyData.h>
|
||||
#include <arrow/api.h>
|
||||
#include <arrow/io/api.h>
|
||||
#include <arrow/status.h>
|
||||
#include <parquet/arrow/reader.h>
|
||||
#include <parquet/file_reader.h>
|
||||
#include "ArrowColumnToCHColumn.h"
|
||||
|
||||
#include <common/logger_useful.h>
|
||||
|
||||
|
||||
#include <sys/stat.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_)
|
||||
: IInputFormat(std::move(header_), in_)
|
||||
class RandomAccessFileFromSeekableReadBuffer : public arrow::io::RandomAccessFile
|
||||
{
|
||||
public:
|
||||
RandomAccessFileFromSeekableReadBuffer(SeekableReadBuffer& in_, off_t file_size_)
|
||||
: in(in_)
|
||||
, file_size(file_size_)
|
||||
, is_closed(false)
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
Chunk ParquetBlockInputFormat::generate()
|
||||
virtual arrow::Status GetSize(int64_t* size) override
|
||||
{
|
||||
Chunk res;
|
||||
auto &header = getPort().getHeader();
|
||||
*size = file_size;
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
if (!in.eof())
|
||||
virtual arrow::Status Close() override
|
||||
{
|
||||
is_closed = true;
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
virtual arrow::Status Tell(int64_t* position) const override
|
||||
{
|
||||
*position = in.getPosition();
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
virtual bool closed() const override { return is_closed; }
|
||||
|
||||
virtual arrow::Status Read(int64_t nbytes, int64_t* bytes_read, void* out) override
|
||||
{
|
||||
*bytes_read = in.readBig(reinterpret_cast<char *>(out), nbytes);
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
virtual arrow::Status Read(int64_t nbytes, std::shared_ptr<arrow::Buffer>* out) override
|
||||
{
|
||||
std::shared_ptr<arrow::Buffer> buf;
|
||||
ARROW_RETURN_NOT_OK(arrow::AllocateBuffer(nbytes, &buf));
|
||||
size_t n = in.readBig(reinterpret_cast<char *>(buf->mutable_data()), nbytes);
|
||||
*out = arrow::SliceBuffer(buf, 0, n);
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
virtual arrow::Status Seek(int64_t position) override
|
||||
{
|
||||
in.seek(position, SEEK_SET);
|
||||
return arrow::Status::OK();
|
||||
}
|
||||
|
||||
private:
|
||||
SeekableReadBuffer& in;
|
||||
off_t file_size;
|
||||
bool is_closed;
|
||||
};
|
||||
|
||||
|
||||
static std::shared_ptr<arrow::io::RandomAccessFile> as_arrow_file(ReadBuffer & in)
|
||||
{
|
||||
if (auto fd_in = dynamic_cast<ReadBufferFromFileDescriptor*>(&in))
|
||||
{
|
||||
struct stat stat;
|
||||
auto res = ::fstat(fd_in->getFD(), &stat);
|
||||
// if fd is a regular file i.e. not stdin
|
||||
if (res == 0 && S_ISREG(stat.st_mode))
|
||||
{
|
||||
/*
|
||||
First we load whole stream into string (its very bad and limiting .parquet file size to half? of RAM)
|
||||
Then producing blocks for every row_group (dont load big .parquet files with one row_group - it can eat x10+ RAM from .parquet file size)
|
||||
*/
|
||||
|
||||
if (row_group_current < row_group_total)
|
||||
throw Exception{"Got new data, but data from previous chunks was not read " +
|
||||
std::to_string(row_group_current) + "/" + std::to_string(row_group_total),
|
||||
ErrorCodes::CANNOT_READ_ALL_DATA};
|
||||
|
||||
file_data.clear();
|
||||
{
|
||||
WriteBufferFromString file_buffer(file_data);
|
||||
copyData(in, file_buffer);
|
||||
}
|
||||
|
||||
buffer = std::make_unique<arrow::Buffer>(file_data);
|
||||
// TODO: maybe use parquet::RandomAccessSource?
|
||||
auto status = parquet::arrow::FileReader::Make(
|
||||
::arrow::default_memory_pool(),
|
||||
parquet::ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(*buffer)),
|
||||
&file_reader);
|
||||
|
||||
row_group_total = file_reader->num_row_groups();
|
||||
row_group_current = 0;
|
||||
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*fd_in, stat.st_size);
|
||||
}
|
||||
//DUMP(row_group_current, row_group_total);
|
||||
if (row_group_current >= row_group_total)
|
||||
return res;
|
||||
}
|
||||
|
||||
// TODO: also catch a ParquetException thrown by filereader?
|
||||
//arrow::Status read_status = filereader.ReadTable(&table);
|
||||
std::shared_ptr<arrow::Table> table;
|
||||
arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, &table);
|
||||
// fallback to loading the entire file in memory
|
||||
std::string file_data;
|
||||
{
|
||||
WriteBufferFromString file_buffer(file_data);
|
||||
copyData(in, file_buffer);
|
||||
}
|
||||
return std::make_shared<arrow::io::BufferReader>(arrow::Buffer::FromString(std::move(file_data)));
|
||||
}
|
||||
|
||||
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "Parquet");
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
}
|
||||
|
||||
#define THROW_ARROW_NOT_OK(status) \
|
||||
do \
|
||||
{ \
|
||||
if (::arrow::Status _s = (status); !_s.ok()) \
|
||||
throw Exception(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
|
||||
} while (false)
|
||||
|
||||
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_)
|
||||
: IInputFormat(std::move(header_), in_)
|
||||
{
|
||||
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(as_arrow_file(in_), arrow::default_memory_pool(), &file_reader));
|
||||
row_group_total = file_reader->num_row_groups();
|
||||
|
||||
std::shared_ptr<arrow::Schema> schema;
|
||||
THROW_ARROW_NOT_OK(file_reader->GetSchema(&schema));
|
||||
|
||||
for (int i = 0; i < schema->num_fields(); ++i)
|
||||
{
|
||||
if (getPort().getHeader().has(schema->field(i)->name()))
|
||||
{
|
||||
column_indices.push_back(i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Chunk ParquetBlockInputFormat::generate()
|
||||
{
|
||||
Chunk res;
|
||||
auto &header = getPort().getHeader();
|
||||
|
||||
if (row_group_current >= row_group_total)
|
||||
return res;
|
||||
}
|
||||
|
||||
void ParquetBlockInputFormat::resetParser()
|
||||
{
|
||||
IInputFormat::resetParser();
|
||||
std::shared_ptr<arrow::Table> table;
|
||||
arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, column_indices, &table);
|
||||
ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, read_status, header, row_group_current, "Parquet");
|
||||
return res;
|
||||
}
|
||||
|
||||
file_reader.reset();
|
||||
file_data.clear();
|
||||
buffer.reset();
|
||||
row_group_total = 0;
|
||||
row_group_current = 0;
|
||||
}
|
||||
void ParquetBlockInputFormat::resetParser()
|
||||
{
|
||||
IInputFormat::resetParser();
|
||||
|
||||
void registerInputFormatProcessorParquet(FormatFactory &factory)
|
||||
{
|
||||
factory.registerInputFormatProcessor(
|
||||
"Parquet",
|
||||
[](ReadBuffer &buf,
|
||||
const Block &sample,
|
||||
const RowInputFormatParams &,
|
||||
const FormatSettings & /* settings */)
|
||||
{
|
||||
return std::make_shared<ParquetBlockInputFormat>(buf, sample);
|
||||
});
|
||||
}
|
||||
file_reader.reset();
|
||||
row_group_total = 0;
|
||||
row_group_current = 0;
|
||||
}
|
||||
|
||||
void registerInputFormatProcessorParquet(FormatFactory &factory)
|
||||
{
|
||||
factory.registerInputFormatProcessor(
|
||||
"Parquet",
|
||||
[](ReadBuffer &buf,
|
||||
const Block &sample,
|
||||
const RowInputFormatParams &,
|
||||
const FormatSettings & /* settings */)
|
||||
{
|
||||
return std::make_shared<ParquetBlockInputFormat>(buf, sample);
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -27,13 +27,10 @@ protected:
|
||||
Chunk generate() override;
|
||||
|
||||
private:
|
||||
|
||||
// TODO: check that this class implements every part of its parent
|
||||
|
||||
std::unique_ptr<parquet::arrow::FileReader> file_reader;
|
||||
std::string file_data;
|
||||
std::unique_ptr<arrow::Buffer> buffer;
|
||||
int row_group_total = 0;
|
||||
// indices of columns to read from Parquet file
|
||||
std::vector<int> column_indices;
|
||||
int row_group_current = 0;
|
||||
};
|
||||
|
||||
|
@ -19,7 +19,6 @@
|
||||
#include <Parsers/parseQuery.h>
|
||||
#include <Parsers/queryToString.h>
|
||||
#include <DataStreams/ExpressionBlockInputStream.h>
|
||||
#include <DataStreams/MarkInCompressedFile.h>
|
||||
#include <Formats/FormatFactory.h>
|
||||
#include <DataStreams/copyData.h>
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
|
@ -3,7 +3,6 @@
|
||||
#include <optional>
|
||||
#include <Core/Types.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartType.h>
|
||||
#include <DataStreams/MarkInCompressedFile.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
@ -7,7 +7,6 @@
|
||||
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
|
||||
#include <Storages/MergeTree/checkDataPart.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
|
||||
#include <DataStreams/MarkInCompressedFile.h>
|
||||
#include <Compression/CompressedReadBuffer.h>
|
||||
#include <IO/HashingReadBuffer.h>
|
||||
#include <Common/CurrentMetrics.h>
|
||||
|
78
dbms/tests/performance/arithmetic.xml
Normal file
78
dbms/tests/performance/arithmetic.xml
Normal file
@ -0,0 +1,78 @@
|
||||
<test>
|
||||
<type>loop</type>
|
||||
<stop_conditions>
|
||||
<all_of>
|
||||
<iterations>10</iterations>
|
||||
<min_time_not_changing_for_ms>100</min_time_not_changing_for_ms>
|
||||
</all_of>
|
||||
<any_of>
|
||||
<iterations>100</iterations>
|
||||
<total_time_ms>1000</total_time_ms>
|
||||
</any_of>
|
||||
</stop_conditions>
|
||||
|
||||
<substitutions>
|
||||
<substitution>
|
||||
<name>arg</name>
|
||||
<values>
|
||||
<value>u8</value>
|
||||
<value>u16</value>
|
||||
<value>u32</value>
|
||||
<value>u64</value>
|
||||
<value>i8</value>
|
||||
<value>i16</value>
|
||||
<value>i32</value>
|
||||
<value>i64</value>
|
||||
<value>f32</value>
|
||||
<value>f64</value>
|
||||
</values>
|
||||
</substitution>
|
||||
<substitution>
|
||||
<name>op</name>
|
||||
<values>
|
||||
<value>plus</value>
|
||||
<value>minus</value>
|
||||
<value>multiply</value>
|
||||
<value>divide</value>
|
||||
<value>intDivOrZero</value>
|
||||
</values>
|
||||
</substitution>
|
||||
</substitutions>
|
||||
|
||||
<create_query>
|
||||
CREATE TABLE nums
|
||||
(
|
||||
u8 UInt8,
|
||||
u16 UInt16,
|
||||
u32 UInt32,
|
||||
u64 UInt64,
|
||||
i8 Int8,
|
||||
i16 Int16,
|
||||
i32 Int32,
|
||||
i64 Int64,
|
||||
f32 Float32,
|
||||
f64 Float64
|
||||
) ENGINE = Memory;
|
||||
</create_query>
|
||||
|
||||
<fill_query>
|
||||
INSERT INTO nums
|
||||
WITH cityHash64(number) AS x
|
||||
SELECT
|
||||
toUInt8(x),
|
||||
toUInt16(x),
|
||||
toUInt32(x),
|
||||
toUInt64(x),
|
||||
toInt8(x),
|
||||
toInt16(x),
|
||||
toInt32(x),
|
||||
toInt64(x),
|
||||
toFloat32(x),
|
||||
toFloat64(x)
|
||||
FROM numbers(100000000);
|
||||
</fill_query>
|
||||
|
||||
<query>SELECT count() FROM nums WHERE NOT ignore({op}({arg}, {arg}))</query>
|
||||
|
||||
<drop_query>DROP TABLE nums</drop_query>
|
||||
</test>
|
@ -174,16 +174,16 @@ Code: 8. DB::Ex---tion: Column "element" is not presented in input data
|
||||
Code: 33. DB::Ex---tion: Error while reading Parquet data: NotImplemented: Reading lists of structs from Parquet files not yet supported: key_value: list<key_value: struct<key: string not null, value: struct<key_value: list<key_value: struct<key: int32 not null, value: bool not null> not null> not null>> not null> not null
|
||||
|
||||
=== Try load data from nonnullable.impala.parquet
|
||||
Code: 33. DB::Ex---tion: Error while reading Parquet data: NotImplemented: Reading lists of structs from Parquet files not yet supported: map: list<map: struct<key: string not null, value: int32 not null> not null> not null
|
||||
Code: 8. DB::Ex---tion: Column "element" is not presented in input data
|
||||
|
||||
=== Try load data from nullable.impala.parquet
|
||||
Code: 33. DB::Ex---tion: Error while reading Parquet data: NotImplemented: Reading lists of structs from Parquet files not yet supported: map: list<map: struct<key: string not null, value: int32> not null> not null
|
||||
Code: 8. DB::Ex---tion: Column "element" is not presented in input data
|
||||
|
||||
=== Try load data from nulls.snappy.parquet
|
||||
Code: 8. DB::Ex---tion: Column "b_c_int" is not presented in input data
|
||||
|
||||
=== Try load data from repeated_no_annotation.parquet
|
||||
Code: 33. DB::Ex---tion: Error while reading Parquet data: NotImplemented: Reading lists of structs from Parquet files not yet supported: phone: list<phone: struct<number: int64 not null, kind: string> not null> not null
|
||||
Code: 8. DB::Ex---tion: Column "number" is not presented in input data
|
||||
|
||||
=== Try load data from userdata1.parquet
|
||||
1454486129 1 Amanda Jordan ajordan0@com.com Female 1.197.201.2 6759521864920116 Indonesia 3/8/1971 49756.53 Internal Auditor 1E+02
|
||||
|
Loading…
Reference in New Issue
Block a user