- Statistics optimisations
- Analyser les statistiques pour les tables
- Récupérer les statistiques dans le metastore
- Joins
- Range join
- Register delta table in hive
- Partitionning
- Bucketing
- Partitionning + Bucketing
- Spark Streaming
- Structured streaming
- RDD Operations
- Catalyst
- SparkSession
- Show session configs
- Partition a table on disk
- Spark Plan
- Generate data
- Estimate dataframe size in bytes
- Cache
- Transform uuid as long
- ordored uuid
- Read partitionned table
- tuning
- videos
- Spark datasources
- udf java
- Memory
- Vocabulary
- web ui
- optimization
- Dates
- Skew joins
- Joins explained
- Spark infer partition type
- spark partition number
- Logging
- Bucketing
- Spark parquet
- Alluxio
- Spark parquet inspection
- streaming
- spark listener analyser
- graph
- Metastore configuration
- spark s3 commiter
- stage barrier
- shuffle
- native function
- Options
- kylin spark backend
- connectors
- hive
- Parquet integration
- Spark case sensitive
- kubernetes
- Profiling
- Schemas and structure
- Codegen
#big-data #database #jvm #video
Spark
→ Statistics optimisations
Par defaut, spark n'exploite pas les statistiques:
spark.config("spark.sql.cbo.joinReorder.enabled", "true")
spark.config("spark.sql.cbo.enabled", "true")
spark.config("spark.sql.cbo.joinReorder.dp.star.filter", "true")
spark.config("spark.sql.statistics.histogram.enabled", "true")
→ Analyser les statistiques pour les tables
spark.sql(s"ANALYZE TABLE <table> COMPUTE STATISTICS")
spark.sql(s"ANALYZE TABLE <table> COMPUTE STATISTICS FOR COLUMNS <col1>,<col2>")
By the way when analyzing for columns, the whole table statistics are also updated. So the first one is not relevant in case the second one is ran.
→ Récupérer les statistiques dans le metastore
select * from "TABLE_PARAMS" where "TBL_ID" = 34900;
34900 | spark.sql.statistics.colStats.person_id.max | 9454641234
34900 | spark.sql.statistics.colStats.person_id.maxLen | 8
34900 | spark.sql.statistics.colStats.person_id.avgLen | 8
34900 | spark.sql.statistics.colStats.person_id.version | 1
34900 | spark.sql.statistics.colStats.person_id.distinctCount | 8119231
34900 | spark.sql.statistics.colStats.person_id.min | 185429806
34900 | spark.sql.statistics.colStats.person_id.nullCount | 0
34900 | spark.sql.statistics.totalSize | 1738777345
34900 | spark.sql.statistics.numRows | 39720020
34900 | numFiles | 0
34900 | totalSize | 0
34900 | spark.sql.sources.schema.numParts | 1
34900 | EXTERNAL | TRUE
34900 | spark.sql.create.version | 2.4.3
34900 | spark.sql.sources.provider | delta
34900 | transient_lastDdlTime | 1583162364
The transient_lastDdlTime represents the last updated statistics timestamp. It can be retrieved with:
select to_timestamp(cast(1583162364 as bigint))::timestamp
to_timestamp
---------------------
2020-03-02 15:19:24
→ Joins
→ Range join
efficient range join The idea is to bucket both columns and add this join condition.
→ Register delta table in hive
you can simply save the table like :
spark.write.format("delta").saveAsTable("the_table")
you can also use an external table :
spark.range(10).write.format("delta").save("/tmp/events")
spark.sql("create table events using delta location '/tmp/events'")
spark.sql("select * from events").show(100)
It is also possible to directly write a delta table into the metastore:
df.write.format("delta").saveAsTable("myDeltaTable")
→ Partitionning
This will create a folder per enumerated column content. Later, any filter (in, =, rlike) will allow to skip partitions.
WARNING: The partition column shall not contain special spark column character such upper case, dot... Otherwize, the filter will throw an exception.
→ Bucketing
It has two benefits and one drawback. The drawback is it needs to shuffle and sort the data before writing it, hence increasing a lot the creation of the table. The two benefits are:
- speed-up filters on the column (skips unused buckets)
- speed-up joins on the column
Best is when both tables are bucketed (again, same bucket number and same bucket column).
It is also possible to join a single bucketed table to a on the fly
repartitionned table. The latter has to be repartitioned into the same number bucket and on the same column.
table1.repartition(100) // 20 * 100 parquet files
.write.format("parquet")
.bucketBy(20, "encounter") // 20 buckets
.sortBy("encounter") // sorted
.saveAsTable("bucketTable")
val notBucketTable = table2
.repartition(20, $"encounter")
.sortWithinPartitions("encounter")
spark.table("bucketTable")
.join(notBucketTable, Seq("encounter"))
→ Partitionning + Bucketing
It is possible to use both partitionnning and bucketing.
table1.repartition(100).write.mode("overwrite").partitionBy("typesimple").bucketBy(20, "encounter").sortBy("encounter").saveAsTable("edsomop.encounterAphp")
→ Spark Streaming
A streaming context can be created from a sparkContext:
import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
val lines = ssc.textFileStream("hdfs:///user/nparis/streamingRepository/*")
lines.foreachRDD { rdd => {
val df = rdd.toDF; df.show
}
}
ssc.start
The textFileStream
gets the texts files within a given repository, and produces a RDD with a row per files rows.
A DStream object, supports transformations such filter
,count
, or map
which returns back a new DStream.
The streaming process can only begin when an "output operation" happens on the stream, such print
, foreachRDD
, saveAsTextFile
...
→ Structured streaming
scaladoc of structured streaming wikibooks
Structured Streaming allows spark streaming to manipulate structured input/output such parquet
, csv
, json
or delta
. It can also handle any output datasource thanks to writeStream.foreach
and foreachBatch
methods.
// schema is necessary
val schema = spark.read.json("structuredStreaming").schema
val c = spark.readStream.schema(schema).json("structuredStreaming")
// checkpointing is necessary
spark.conf.set("spark.sql.streaming.checkpointLocation", "structuredStreamingCheckpoint")
c.writeStream.format("delta").option("path", "structuredStreamingDelta").start
// this alternative allows to apply any thing on the stream
c.writeStream.foreachBatch((b, i) => {
println(i); b.show
}).start
Each batch add a new parquet file with the content.
val in = spark.readStream.format("delta").option("ignoreChanges", "true").option("path", "structuredStreamingDelta").load
// update the input table with an other process
DeltaTable.forPath("structuredStreamingDelta").updateExpr("query = '91400+11457491401+54+rue+alphonse+daudet'", Map("query" -> "'daudet'"))
// this will resend all the data of every parquet file containing the updated row.
In case of delta
source, it does not handle delete
. Hewever it handles updates
: every parquet files which are rewritten by the delta acid method are sent again to the streamWriter. This will cause duplicated record and they have to be handled by the datasource.
The main(?) advantage of using delta as a datasource for strucuted streaming vacuum
processes allow to reduce the number of files that would increase batch after batch.
→ RDD Operations
pairRDD scaladoc: RDD transformations Consider the below RDD:
val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)
→ groupByKey
data
.flatMap(line => line.split("="))
.map(word => (word, 1))
.groupByKey()
.mapValues(_.sum)
+---+---+
| _1 | _2 |
+---+---+
| bar |
3 |
| A |
4 |
| B |
1 |
| foo |
5 |
| C |
1 |
| D |
2 |
+---+---+
→ reduceByKey
data
.flatMap(line => line.split("="))
.map(word => (word, 1))
.reduceByKey((x, y) => (x + y))
+---+---+
| _1 | _2 |
+---+---+
| bar |
3 |
| A |
4 |
| B |
1 |
| foo |
5 |
| C |
1 |
| D |
2 |
+---+---+
→ aggregateByKey
def
aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]
Aggregate the values of each key
, using given combine functions and a
neutral
"zero value".This function can
return a different result
type, U
, than the
type of
the values in
this RDD
, V.Thus
, we need one
operation
for merging a V into a U and one operation
for merging two
U
's
, as in scala.TraversableOnce.The former operation is used
for
merging values within a partition
, and the latter is used
for merging
values between partitions.To avoid memory allocation
, both of these
functions are allowed to modify and
return their first argument
instead of creating a new U.
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
+---+---+
| _1 | _2 |
+---+---+
| bar |
3 |
| foo |
5 |
+---+---+
→ Catalyst
→ SparkSession
→ Show session configs
spark.sql("set").show()
→ Partition a table on disk
This generates test data:
# pip3 install fastparquet pandas numpy
def generate_data(n, seed):
import pandas as pd
import numpy as np
np.random.seed(seed)
np_array = np.random.random_sample(n) # generates an array of M random values
df = pd.DataFrame(np_array, columns=["x"])
df["seed"] = seed
df.reset_index().to_parquet(f"/tmp/bucket/part-{str(seed).zfill(5)}.parquet")
df = spark.read.parquet("/tmp/bucket").withColumn("part", expr("case when x > 0.5 then 'bob' else 'jim' end"))
# this creates multiple json per partitions
df.repartition(1000, "part").write.format("json").mode("overwrite").partitionBy("part").save("/tmp/df")
# this creates one json per partition
df.repartition(1000, "part").write.format("json").mode("overwrite").partitionBy("part").save("/tmp/df")
→ Spark Plan
The spark plans can be accessed from the QueryExection
object.
→ Generate data
val df = spark.range(0, 2000).selectExpr(
"id as _1",
"cast(id % 10 as byte) as _2",
"cast(id % 10 as short) as _3",
"cast(id % 10 as int) as _4",
"cast(id % 10 as float) as _5",
"cast(id % 10 as double) as _6",
"cast(id % 10 as decimal(20,0)) as _7",
"cast(id % 2 as boolean) as _8",
"cast(cast(1618161925000 + (id % 10) * 1000 * 60 * 60 * 24 as timestamp) as date) as _9",
"cast(1618161925000 + (id % 10) as timestamp) as _10"
)
→ Estimate dataframe size in bytes
import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(b)
→ Cache
Columnar cache:
spark.catalog.cacheTable("tableName") or dataFrame.cache()
spark.catalog.uncacheTable("tableName") or dataFrame.unpersist()
→ Transform uuid as long
There is no collision :
def stringToUUID(uuid: String): Long = {
return java.util.UUID.fromString(uuid).getMostSignificantBits() & java.lang.Long.MAX_VALUE
}
val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))
d.withColumn("user_id_long", stringToUUIDUdf(expr("user_id"))).show
→ ordored uuid
→ Read partitionned table
spark.read
.option("basePath", "/tmp/my_table") // will get field date
.option("spark.sql.sources.partitionColumnTypeInference.enabled", "false") // won't infer date as date type
.format("parquet")
.read("/tmp/my_table/date=2022-01-01/*.parquet")
→ tuning
- Increase shuffle block size to improve compression efficiently
- several advices, including skew, shuffle and config
- shuffle blog
- performance
- shuffle
- GC
- RPC
→ videos
→ Spark datasources
→ Datasource v1
BaseRelation
: abstraction of a dataframe loaded from a datasource, provides the schema of the dataRelationProvider
: handle user's options and create a baseRelationTableScan
: read the data, and construct the rows
→ Datasource v2
→ Datasource spark4
→ udf java
sqlContext.udf()
.register("strlen",
(String s)->s.length(),DataTypes.StringType);
→ Memory
- From spark cookbook, Rishi Yadav
- Heap is used for objects
- 60% is used for caching rdd= storage memory
- 40% for spark computation buffer =execution memory
- execution can evict storage up to 50%. This is configured by
spark.memory.StorageFraction
→ GC
- Use G1 for spark (vs cms and gc )
- for streaming prefer cms
- GC spark benchmark
spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps
→ kryo
scala > sc.getConf.registerKryoClasses(Array(classOf[com.
infoobjects.CustomClass1], classOf[com.infoobjects.CustomClass2])
.set(“spark.kryoserializer.buffer.max”, “128m”)
.set(“spark.kryoserializer.buffer”, “64m”)
→ Vocabulary
- Spark produces one
job
peraction
- a
job
contains onestage
plus one other perwide transformation
- a
stage
contains several tasks, based on parallelism
→ web ui
http://web-ui/metrics/json
produces all themetrics.properties
helps to monitor metrics ![[Screenshot_20230501-225402_NewPipe.png]]
![[Screenshot_20230501-231933_NewPipe.png]]
![[Screenshot_20230501-222826_NewPipe.png]]
→ optimization
spark.unsafe.offHeap
enablespark.sql.codegen
spark.sql.inMemoryColumnarStorage.compressed
spark.cleaner.ttl
in seconds, to clean up any metadata (stages generated, tasks generated, and so on)spark.default.parallelism
use 3 tasks per CPU core overall- spark.shuffle.manager=tungsten-sort (from spark in action)
- bunch of optims here
→ Dates
-
When writing timestamp values out to non-text data sources like Parquet, the values are just instants (like timestamp in UTC) that have no time zone information
A fair timestamp behavior for spark should be not to use int96 based, since it's deprecated in parquet.
spark.conf.set("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")
There is some context here
→ Skew joins
→ Joins explained
- shuffle merge sort join
- shuffle hash join
- broadcast hash join
- broadcast nested loop
- bloom filter join hint
- bloom filter join
- storage partition join To use shuffle hash join
set spark.sql.join.preferSortMergeJoin=false
is not enougth usedf1.join(df2.hint("shuffle_hash"), ....)
- Runtime filter
- row level runtime filter also
→ Spark infer partition type
see spark.sql.sources.partitionColumnTypeInference.enabled
→ spark partition number
// avoid to transforme DF into rdd df.queryExecution.executedPlan.execute().partitions.length
→ Logging
add -Dlog4j.configuration=log4j.xml to spark.executor.extraJavaOptions
so the executors pick it up.
→ Bucketing
Spark bucketing in hive metastore:
spark.sql.sources.schema.bucketCol.0 | tbl_id
spark.sql.sources.provider | parquet
numFiles | 2
spark.sql.sources.schema.numBuckets | 4
spark.sql.create.version | 3.2.3
spark.sql.sources.schema | {"type":"struct","fields":[{"name":"tbl_id","type":"long","nullable":true,"metadata":{}},{"name":"kafka_datetime","type":"timestamp","nullable":true,"metadata":{}},{"name":"event_timesta
:"timestamp","nullable":true,"metadata":{}},{"name":"version","type":"string","nullable":true,"metadata":{}},{"name":"event_date","type":"string","nullable":true,"metadata":{}},{"name":"event_hour","type":"string","nullable":true
":{}}]}
totalSize | 3824
spark.sql.sources.schema.sortCol.0 | event_timestamp
spark.sql.sources.schema.numBucketCols | 1
spark.sql.sources.schema.numSortCols | 1
transient_lastDdlTime | 1683402911
- spark bucketing article
- hive / spark bucketing
- trino support
- athena v3 support
- hudi bucket index support bulk insert
- hudi hash index rfc
- hudi hash index pr
- Hudi consistent hash index
→ Spark parquet
→ Alluxio
→ Spark parquet inspection
pip3 install ipython
parquet-tools inspect
→ streaming
→ spark listener analyser
→ graph
→ Metastore configuration
- according to this doc, both jdbc and thrift can be used to connect spark to HMS
- using only database for metastore
→ spark s3 commiter
- the default commiter writes first to
_temporary
then move files sequentially from the driver to the target folder - the
_magic
commiterspark.hadoop.fs.s3a.bucket.all.committer.magic.enabled
, use a magic protocol to directly write within the target by using a__magic
folder - you must set
s3a
as the prefix
→ stage barrier
→ shuffle
→ native function
→ Options
Properties set directly on the SparkConf take highest precedence, then flags passed to spark-submit or spark-shell, then options in the spark-defaults.conf file.
- Number rows per files:
spark.sql.files.maxRecordsPerFile
→ kylin spark backend
→ connectors
→ hive
→ Parquet integration
→ Spark case sensitive
→ kubernetes
→ memory
-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps
spark.executor.memoryOverhead
spark.memory.offHeap.enabled": "true"
"spark.kubernetes.memoryOverheadFactor": "0.2"
About xms on yarn and k8s: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/781#issuecomment-1250345153
→ service account
Driver needs a role to create executors It uses the default service account of the namespace for that except spark.kubernetes.authenticate.driver.serviceAccountName
or spark.kubernetes.authenticate.serviceAccountName
is configured.
→ Profiling
spark.sparkContext.getMemoryStatus().inUse()
If you are planning to run Spark for a long time on a cluster, you may wish to enable spark.cleaner.ttl. By default, Spark does not clean up any metadata (stages generated, tasks generated, and so on); set this to a non-zero value in seconds to clean up the metadata after that length of time.
- Overhead memory -> spark.executor.memoryOverhead
- Heap Memory -> spark.executor.memory
- Off Heap Memory -> spark.memory.offHeap.size
- Pyspark Memory -> spark.executor.pyspark.memory
Off-heap memory is a great way to reduce GC pauses because it's not in the GC's scope. However, it brings an overhead of
serialization and deserialization. The latter in its turn makes that the off-heap data can be sometimes put onto heap
memory and hence be exposed to GC. Also, the new data format brought by Project Tungsten (array of bytes) helps to
reduce the GC overhead. These 2 reasons make that the use of off-heap memory in Apache Spark applications should be
carefully planned and, especially, tested.
→ file path
One can access file path from _metadata
field or the dedicated function. See See
→ spark schema from avro schema
→ Schemas and structure
→ set nullable
def setNullableStateForAllColumns(df: DataFrame, nullable: Boolean): DataFrame = {
def set(st: StructType): StructType = {
StructType(st.map {
case StructField(name, dataType, _, metadata) =>
val newDataType = dataType match {
case t: StructType => set(t)
case _ => dataType
}
StructField(name, newDataType, nullable = nullable, metadata)
})
}
df.sqlContext.createDataFrame(df.rdd, set(df.schema))
}