Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 69 additions & 37 deletions be/src/olap/rowset/segment_v2/variant/variant_column_writer_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ Status UnifiedSparseColumnWriter::append_single_sparse(

// Build path frequency statistics with upper bound limit to avoid
// large memory and metadata size. Persist to meta for readers.
std::unordered_map<StringRef, size_t> path_counts;
phmap::flat_hash_map<StringRef, size_t, StringRefHash> path_counts;
const auto [paths, _] = src.get_sparse_data_paths_and_values();
size_t limit = parent_column.variant_max_sparse_column_statistics_size();
for (size_t i = 0; i != paths->size(); ++i) {
Expand Down Expand Up @@ -324,6 +324,14 @@ Status UnifiedSparseColumnWriter::append_bucket_sparse(
const int bucket_num = static_cast<int>(_bucket_writers.size());
const auto [paths_col, values_col] = src.get_sparse_data_paths_and_values();
const auto& offsets = src.serialized_sparse_column_offsets();
phmap::flat_hash_map<StringRef, uint32_t, StringRefHash> path_to_bucket;
path_to_bucket.reserve(std::min<size_t>(paths_col->size(), 8192));
const size_t limit = parent_column.variant_max_sparse_column_statistics_size();
std::vector<phmap::flat_hash_map<StringRef, size_t, StringRefHash>> bucket_path_counts(
bucket_num);
for (int b = 0; b < bucket_num; ++b) {
bucket_path_counts[b].reserve(std::min<size_t>(limit, 1024));
}

std::vector<vectorized::MutableColumnPtr> tmp_maps(bucket_num);
for (int b = 0; b < bucket_num; ++b) {
Expand All @@ -335,19 +343,38 @@ Status UnifiedSparseColumnWriter::append_bucket_sparse(
auto& m = assert_cast<vectorized::ColumnMap&>(*tmp_maps[b]);
m.get_offsets().reserve(num_rows);
}
for (ssize_t row = 0; row < static_cast<ssize_t>(num_rows); ++row) {
size_t start = offsets[row - 1];
size_t end = offsets[row];
std::vector<vectorized::ColumnString*> bucket_keys(bucket_num);
std::vector<vectorized::ColumnString*> bucket_values(bucket_num);
std::vector<vectorized::ColumnArray::Offsets64*> bucket_offsets(bucket_num);
for (int b = 0; b < bucket_num; ++b) {
auto& m = assert_cast<vectorized::ColumnMap&>(*tmp_maps[b]);
bucket_keys[b] = &assert_cast<vectorized::ColumnString&>(m.get_keys());
bucket_values[b] = &assert_cast<vectorized::ColumnString&>(m.get_values());
bucket_offsets[b] = &m.get_offsets();
}
for (size_t row = 0; row < num_rows; ++row) {
const size_t start = offsets[row - 1];
const size_t end = offsets[row];
for (size_t i = start; i < end; ++i) {
StringRef path = paths_col->get_data_at(i);
uint32_t b = vectorized::variant_util::variant_binary_shard_of(path, bucket_num);
auto& map_col = assert_cast<vectorized::ColumnMap&>(*tmp_maps[b]);
map_col.get_keys_ptr()->assume_mutable()->insert_from(*paths_col, i);
map_col.get_values_ptr()->assume_mutable()->insert_from(*values_col, i);
uint32_t b = 0;
if (auto it = path_to_bucket.find(path); it != path_to_bucket.end()) {
b = it->second;
} else {
b = vectorized::variant_util::variant_binary_shard_of(path, bucket_num);
path_to_bucket.emplace(path, b);
}
bucket_keys[b]->insert_data(path.data, path.size);
bucket_values[b]->insert_from(*values_col, i);
auto& path_counts = bucket_path_counts[b];
if (auto it = path_counts.find(path); it != path_counts.end()) {
++it->second;
} else if (path_counts.size() < limit) {
path_counts.emplace(path, 1);
}
}
for (int b = 0; b < bucket_num; ++b) {
auto& map_col = assert_cast<vectorized::ColumnMap&>(*tmp_maps[b]);
map_col.get_offsets().push_back(map_col.get_keys().size());
bucket_offsets[b]->push_back(bucket_keys[b]->size());
}
}
for (int b = 0; b < bucket_num; ++b) {
Expand All @@ -364,22 +391,9 @@ Status UnifiedSparseColumnWriter::append_bucket_sparse(
converter->clear_source_content(this_col_id);
_bucket_opts[b].meta->set_num_rows(num_rows);
}
// per-bucket statistics
for (int b = 0; b < bucket_num; ++b) {
auto& map_col = assert_cast<vectorized::ColumnMap&>(*tmp_maps[b]);
const auto& keys = assert_cast<const vectorized::ColumnString&>(map_col.get_keys());
std::unordered_map<StringRef, size_t> bucket_path_counts;
bucket_path_counts.reserve(1024);
size_t limit = parent_column.variant_max_sparse_column_statistics_size();
for (size_t i = 0; i < keys.size(); ++i) {
StringRef k = keys.get_data_at(i);
if (auto it = bucket_path_counts.find(k); it != bucket_path_counts.end())
++it->second;
else if (bucket_path_counts.size() < limit)
bucket_path_counts.emplace(k, 1);
}
segment_v2::VariantStatistics bucket_stats;
for (const auto& [k, cnt] : bucket_path_counts) {
for (const auto& [k, cnt] : bucket_path_counts[b]) {
bucket_stats.sparse_column_non_null_size.emplace(k.to_string(),
static_cast<int64_t>(cnt));
}
Expand Down Expand Up @@ -422,24 +436,43 @@ Status VariantDocWriter::append_data(const TabletColumn* parent_column,
auto& map_col = assert_cast<vectorized::ColumnMap&>(*tmp_maps[b]);
map_col.get_offsets().reserve(num_rows);
}
std::vector<vectorized::ColumnString*> bucket_keys(_bucket_num);
std::vector<vectorized::ColumnString*> bucket_values(_bucket_num);
std::vector<vectorized::ColumnArray::Offsets64*> bucket_offsets(_bucket_num);
for (int b = 0; b < _bucket_num; ++b) {
auto& m = assert_cast<vectorized::ColumnMap&>(*tmp_maps[b]);
bucket_keys[b] = &assert_cast<vectorized::ColumnString&>(m.get_keys());
bucket_values[b] = &assert_cast<vectorized::ColumnString&>(m.get_values());
bucket_offsets[b] = &m.get_offsets();
}

std::vector<std::unordered_map<StringRef, uint32_t>> bucket_path_counts(_bucket_num);
std::vector<phmap::flat_hash_map<StringRef, uint32_t, StringRefHash>> bucket_path_counts(
_bucket_num);
const size_t limit = parent_column->variant_max_sparse_column_statistics_size();
for (int b = 0; b < _bucket_num; ++b) {
bucket_path_counts[b].reserve(std::min<size_t>(limit, 1024));
}
phmap::flat_hash_map<StringRef, uint32_t, StringRefHash> path_to_bucket;
path_to_bucket.reserve(std::min<size_t>(paths_col->size(), 8192));

for (size_t row = 0; row < num_rows; ++row) {
const ssize_t srow = static_cast<ssize_t>(row);
size_t start = offsets[srow - 1];
size_t end = offsets[srow];
const size_t start = (row == 0) ? 0 : offsets[row - 1];
const size_t end = offsets[row];
for (size_t i = start; i < end; ++i) {
StringRef path = paths_col->get_data_at(i);
uint32_t bucket = vectorized::variant_util::variant_binary_shard_of(path, _bucket_num);
auto& map_col = assert_cast<vectorized::ColumnMap&>(*tmp_maps[bucket]);
map_col.get_keys_ptr()->assume_mutable()->insert_from(*paths_col, i);
map_col.get_values_ptr()->assume_mutable()->insert_from(*values_col, i);
uint32_t bucket = 0;
if (auto it = path_to_bucket.find(path); it != path_to_bucket.end()) {
bucket = it->second;
} else {
bucket = vectorized::variant_util::variant_binary_shard_of(path, _bucket_num);
path_to_bucket.emplace(path, bucket);
}
bucket_keys[bucket]->insert_data(path.data, path.size);
bucket_values[bucket]->insert_from(*values_col, i);
bucket_path_counts[bucket][path]++;
}
for (int b = 0; b < _bucket_num; ++b) {
auto& map_col = assert_cast<vectorized::ColumnMap&>(*tmp_maps[b]);
map_col.get_offsets().push_back(map_col.get_keys().size());
bucket_offsets[b]->push_back(bucket_keys[b]->size());
}
}

Expand Down Expand Up @@ -662,7 +695,6 @@ Status VariantColumnWriterImpl::_process_binary_column(
if (_tablet_column->variant_enable_doc_mode()) {
_binary_writer = std::make_unique<VariantDocWriter>();
bucket_num = std::max(1, _tablet_column->variant_doc_hash_shard_count());
ptr->sort_doc_value_column();
} else {
_binary_writer = std::make_unique<UnifiedSparseColumnWriter>();
bucket_num = std::max(1, _tablet_column->variant_sparse_hash_shard_count());
Expand Down Expand Up @@ -1176,8 +1208,8 @@ Status VariantDocCompactWriter::finalize() {
const auto& column_offsets = variant_column->serialized_doc_value_column_offsets();
std::map<StringRef, uint32_t> column_stats;
for (int64_t i = 0; i < num_rows; ++i) {
size_t start = column_offsets[i - 1];
size_t end = column_offsets[i];
const size_t start = column_offsets[i - 1];
const size_t end = column_offsets[i];
for (size_t j = start; j < end; ++j) {
const auto& key = column_key->get_data_at(j);
column_stats[key] += 1;
Expand Down
64 changes: 14 additions & 50 deletions be/src/vec/columns/column_variant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2483,25 +2483,23 @@ void ColumnVariant::Subcolumn::deserialize_from_binary_column(const ColumnString
DCHECK_EQ(end_ptr - reinterpret_cast<const uint8_t*>(data_ref.data), data_ref.size);
};

// check if the type is same as least common type
// if the type is same as least common type, we can directly deserialize to the subcolumn
// if not, we need to deserialize to the field first, then insert to the subcolumn
bool same_as_least_common_type = type != least_common_type.get_type_id();

// array needs to check nested type is same as least common type's nested type
if (!same_as_least_common_type && type == PrimitiveType::TYPE_ARRAY) {
// |PrimitiveType::TYPE_ARRAY| + |size_t| + |nested_type|
// skip the first 1 byte for PrimitiveType::TYPE_ARRAY and the next sizeof(size_t) bytes for the size of the array
const auto* nested_start_data = start_data + 1 + sizeof(size_t);
const PrimitiveType nested_type = TabletColumn::get_primitive_type_by_field_type(
static_cast<FieldType>(*nested_start_data));
same_as_least_common_type = (nested_type != least_common_type.get_base_type_id());
}

if (same_as_least_common_type) {
bool need_deserialize_to_field = (type != least_common_type.get_type_id());
if (!need_deserialize_to_field && type == PrimitiveType::TYPE_ARRAY) {
need_deserialize_to_field = true;
}

if (need_deserialize_to_field) {
Field res;
FieldInfo info;
const uint8_t* end_data = DataTypeSerDe::deserialize_binary_to_field(start_data, res, info);
if (res.is_complex_field()) {
FieldInfo recomputed;
variant_util::get_field_info(res, &recomputed);
info.scalar_type_id = recomputed.scalar_type_id;
info.have_nulls = recomputed.have_nulls;
info.need_convert = recomputed.need_convert;
info.num_dimensions = recomputed.num_dimensions;
}
check_end(end_data);
insert(std::move(res), std::move(info));
} else {
Expand Down Expand Up @@ -2580,40 +2578,6 @@ MutableColumnPtr ColumnVariant::clone() const {
return res;
}

void ColumnVariant::sort_doc_value_column() {
const auto& offset = serialized_doc_value_column_offsets();

auto sort_map_by_row_paths = [&](const ColumnString& in_paths, const ColumnString& in_values,
const ColumnArray::Offsets64& in_offsets) -> MutableColumnPtr {
auto sorted = create_binary_column_fn();
auto& sorted_map = assert_cast<ColumnMap&>(*sorted);
auto& out_paths = assert_cast<ColumnString&>(sorted_map.get_keys());
auto& out_values = assert_cast<ColumnString&>(sorted_map.get_values());
auto& out_offsets = sorted_map.get_offsets();
out_offsets.reserve(num_rows);

for (int64_t i = 0; i < num_rows; ++i) {
size_t start = in_offsets[i - 1];
size_t end = in_offsets[i];
std::vector<std::tuple<std::string_view, size_t>> order;
order.reserve(end - start);
for (size_t j = start; j < end; ++j) {
order.emplace_back(in_paths.get_data_at(j).to_string_view(), j);
}
std::sort(order.begin(), order.end());
for (const auto& [p, j] : order) {
out_paths.insert_data(p.data(), p.size());
out_values.insert_from(in_values, j);
}
out_offsets.push_back(out_paths.size());
}
return sorted;
};

auto [path, value] = get_doc_value_data_paths_and_values();
serialized_doc_value_column = sort_map_by_row_paths(*path, *value, offset);
}

bool ColumnVariant::is_doc_mode() const {
const auto& offset = serialized_doc_value_column_offsets();
return subcolumns.size() == 1 && offset[num_rows - 1] != 0;
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/columns/column_variant.h
Original file line number Diff line number Diff line change
Expand Up @@ -634,8 +634,6 @@ class ColumnVariant final : public COWHelper<IColumn, ColumnVariant> {
return _max_subcolumns_count - current_subcolumns_count;
}

void sort_doc_value_column();

// doc snapshot mode: only root column, and doc snapshot column is not empty
bool is_doc_mode() const;

Expand Down
4 changes: 3 additions & 1 deletion be/src/vec/columns/subcolumn_tree.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
// and modified by Doris

#pragma once
#include <parallel_hashmap/phmap.h>

#include <memory>

#include "runtime/exec_env.h"
Expand Down Expand Up @@ -47,7 +49,7 @@ class SubcolumnsTree {
Kind kind = TUPLE;
const Node* parent = nullptr;

std::unordered_map<StringRef, std::shared_ptr<Node>, StringRefHash> children;
phmap::flat_hash_map<StringRef, std::shared_ptr<Node>, StringRefHash> children;

NodeData data;
PathInData path;
Expand Down
Loading
Loading