- Comparison
- apache iceberg
- Delta-lake
- Hudi
- Blogs
- tips
- limitations
- GDPR handling
- metadata table
- Hudi cli
- Z-order
- Bloom investigations
- Comment Investigation
- Sorting
- Custom sort
- GlobalSortPartitionerWithRows
- PartitionPathRepartitionAndSortPartitionerWithRows
- PartitionSortPartitionerWithRows
- z-order support
- Sizing
- Merge On read tables
- Stats columns
- plugins
- Apache paimon
- tla+/fizzbe model
- table interoperability
#spark
Acid Tables
→ Comparison
- Dremio
- set of files
- row level operations
- iceberg vs hudi
- hudi vs iceberg
- iceberg contributors
- hudi contributors
→ apache iceberg
- Indexing hyperspace
- foko conf
- hidden partition
- Config spark
- puffin indexes
- compaction
- Cow and mor delete
- stats index spec
- type promotion is supported
- Kafka/iceberg
- CDC merge
→ limitations
→ Delta-lake
→ Features
- vectorization
- Delta spec
- gdpr
- tuning
- indexing hyperacale
- change data feed
- metadata files are in parquet
- occ
- universal reads iceberg hudi
- cow optim ippon
- autotune size parquet
- delta lake spec
- stats spec
- about uniforme ans iceberg
- support type promotion un delta 4
→ limitations
→ Hudi
→ Blogs
→ tips
- Custom partitionner
- Bulk insert zorder partitionner
- onehouse indexes for upserts
- avoid OCC
- bench
- wow bench
- aws bench
- yet another bench
- bucket
- hudi cli
- hudi writers explained
- hudi reader explained
- record index PR
hudi.metadata-listing-enabled=TRUE
see athena official doc or or hudi slackhive.hudi-metadata-enabled
for presto- compaction
- clean and archive
- keygwnerator
- bulk insert sort
- early conflict detection
- timeline server
- savepoint
- record based index
- one can move from COW into MOR table and back mor to cow after compaction and FAQ
- base files, slices, logs explained
- table stats tool
- hudi file layout
- timeline deep dive
- fast copy on write
- fast copy on write hudi rfc
- insert step by step
- column stats index
hoodie.combine.before.delete
when already deduplicated, avoids distinct count during deletion- one table vs uniform
- table format war
- how to choose index
- Disruptor helps both insert/bulk only
- read path also MDT spec
- Mor reader code
- spark new reader rfc
hoodie.datasource.hive_sync.filter_pushdown_enabled
when compare HMS and new added partition, this push down partitions (otz will fetch all)- they wrote their own hfile spec!
- delete partition
→ limitations
→ GDPR handling
→ Case of huge append only tables:
One can use MOR tables, and bulk insert data, with global sort for read performance. This would lead to adding base files only. As for updates/deletes, it could be done by adding logs files on a regular basis. The table compaction could be done on a monthly basis, which would reduce drastically the file amplification. As for the compaction strategy, they could trigger an inline compaction with large resources from time to time. Also they be carefully with the occ to avoid killing the long running job. Likely they can limit the compact ion scope either by number of logs per base file or on day based partition strategy; there is lot of strategies
An other approach would be to leverage the uber fast cow feature.
Questions remains:
- would insert produce base files for new partitions ? Yes, no log file w/o base file
- would reading base files only partition be optimised ?
- would compaction need huge resources ? There is multiple strategies, to split the task
- would changing table layout mor / cow possible ? Yes see
→ case of normal tables
Either method can be used.
→ metadata table
- can be used by athena by specifying
hudi.metadata-listing-enabled=TRUE
- after reading the MDT that log :
table.HoodieTableMetaClient: Finished Loading Table of type MERGE_ON_READ(version=1, baseFileFormat=HFILE)
- may be read incrementally by subsequently system such starrocks
- record level index
→ Hudi cli
You can rollback commits, but from the latest to the newest. Not a given commit.
connect --path s3://foo/bar
help commit rollback
commit rollback --commit 20230605210008584 --rollbackUsingMarkers false
- from 0.13, there is a bundle jar to run the CLI
→ Z-order
- bulk insert with z-order/hilbert curve directly
"hoodie.clustering.plan.strategy.sort.columns": "col1,col2,col3", "hoodie.layout.optimize.build.curve.sample.size": "3",
"hoodie.layout.optimize.curve.build.method": "sample",
"hoodie.layout.optimize.strategy": "hilbert",
"hoodie.bulkinsert.user.defined.partitioner.class": "org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner",
→ Bloom investigations
In apache spark:
- ParquetOutputWriter in spark asks for parquet:ParquetOutputFormat which get the bloom configs
- ParquetUtils in spark has PrepareWrite function, which propagate to ParquetOurputWriter
- ParquetWrite in spark has prepareWrite function, which propagate to ParquetUtils.prepareWrite
- ParquetTable in spark uses ParquetWrite
- ParquetDatasourceV2 in spark uses ParquetTable in getTable (then for read and write)
- not really related: ParquetWriter in spark
In apache hudi:
For bulk_insert:
[HoodieBaseParquetWriter extends ParquetWriter]
[HoodieInternalRowParquetWriter extends HoodieBaseParquetWriter]
[HoodieInternalRowFileWriterFactory return a HoodieInternalRowParquetWriter]
[HoodieRowCreateHandle returns a HoodieInternalRowFileWriterFactory]
For insert:
- [HoodieBaseParquetWriter extends ParquetWriter]
- [HoodieAvroParquetWriter]
→ Comment Investigation
- when reading a regular hive table, it uses the
HiveSessionCatalog
which likely grab comments - its a v2SessionCatalog
→ Sorting
→ Custom sort
hoodie.bulkinsert.user.defined.partitioner.class=RowCustomColumnsSortPartitioner
hoodie.bulkinsert.user.defined.partitioner.sort.columns=col1,col2,col3
hoodie.bulkinsert.shuffle.parallelism=20
It does a sort (order by) on specified columns and then coalesce, based on shuffle parallelism.
→ GlobalSortPartitionerWithRows
hoodie.bulkinsert.sort.mode=GLOBAL_SORT
hoodie.bulkinsert.shuffle.parallelism=20
It does a sort (order by) on both _hoodie_partition_path
and _hoodie_record_key
columns and then coalesce, based on shuffle parallelism
→ PartitionPathRepartitionAndSortPartitionerWithRows
hoodie.bulkinsert.sort.mode=PARTITION_PATH_REPARTITION_AND_SORT
hoodie.bulkinsert.shuffle.parallelism=20
It does a repartition based on _hoodie_partition_path
and then sortWithinPartitions based again on _hoodie_partition_path
.
Weird to both repartition and sort, the docs explains:
this sort mode does an additional step of sorting the records based on the partition path within a single Spark partition, given that data for multiple physical partitions can be sent to the same Spark partition and executor. If data is skewed (most records are intended for a handful of partition paths among all) then this can cause an imbalance among Spark executors.
→ PartitionSortPartitionerWithRows
hoodie.bulkinsert.sort.mode=PARTITION_SORT
hoodie.bulkinsert.shuffle.parallelism=20
Same as PartitionPathRepartitionAndSortPartitionerWithRows, but no repartition before sorting.
→ z-order support
→ Sizing
- page size = 1_048_576
- row group size = 125_829_120
- parquet max size = 125_829_120
→ Merge On read tables
- athena likely not optimized with MOR tables, since it scans the whole parquet files
select * from test_hudi_mor_ro where part = 'C' and uuid=11 -- Execution time: 802 ms, Data scanned: 0 B, Approximate cost: $0.00 select * from test_hudi_mor_rt where part = 'C' and uuid=11 -- Execution time: 2569 ms, Data scanned: 425.39 KB, Approximate cost: $0.00
- spark cannot read
_rt
tables by datasource: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.PartitionedFile.<init>(Lorg/apache/spark/sql/catalyst/InternalRow;Ljava/lang/String;JJ[Ljava/lang/String;)V at org.apache.hudi.BaseMergeOnReadSnapshotRelation.$anonfun$buildSplits$2(MergeOnReadSnapshotRelation.scala:237) at scala.Option.map(Option.scala:230) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.$anonfun$buildSplits$1(MergeOnReadSnapshotRelation.scala:235) at scala.collection.immutable.List.map(List.scala:293) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.buildSplits(MergeOnReadSnapshotRelation.scala:231) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:223) at org.apache.hudi.BaseMergeOnReadSnapshotRelation.collectFileSplits(MergeOnReadSnapshotRelation.scala:64) at org.apache.hudi.HoodieBaseRelation.buildScan(HoodieBaseRelation.scala:353) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$4(DataSourceStrategy.scala:360) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:394) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:473) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:393) at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:360) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:63)
→ Stats columns
→ plugins
→ trino
- trino connector
- Trino plugin code
- MDT usage for partitions
- MDT stats data skipping
- rfc for unified plugins
- trino on hudi
→ Apache paimon
→ tla+/fizzbe model
→ table interoperability
→ xtable
Xtables allows both oneshot and incremental sink between the three formats. It's then possible to read/write with the target format.