#big-data #database #jvm #video

Spark

Statistics optimisations

Par defaut, spark n'exploite pas les statistiques:

spark.config("spark.sql.cbo.joinReorder.enabled", "true")
spark.config("spark.sql.cbo.enabled", "true")
spark.config("spark.sql.cbo.joinReorder.dp.star.filter", "true")
spark.config("spark.sql.statistics.histogram.enabled", "true")

Analyser les statistiques pour les tables

spark.sql(s"ANALYZE TABLE <table> COMPUTE STATISTICS")
spark.sql(s"ANALYZE TABLE <table> COMPUTE STATISTICS FOR COLUMNS <col1>,<col2>")

By the way when analyzing for columns, the whole table statistics are also updated. So the first one is not relevant in case the second one is ran.

Récupérer les statistiques dans le metastore

select * from "TABLE_PARAMS" where  "TBL_ID" = 34900;

  34900 | spark.sql.statistics.colStats.person_id.max           | 9454641234
  34900 | spark.sql.statistics.colStats.person_id.maxLen        | 8
  34900 | spark.sql.statistics.colStats.person_id.avgLen        | 8
  34900 | spark.sql.statistics.colStats.person_id.version       | 1
  34900 | spark.sql.statistics.colStats.person_id.distinctCount | 8119231
  34900 | spark.sql.statistics.colStats.person_id.min           | 185429806
  34900 | spark.sql.statistics.colStats.person_id.nullCount     | 0
  34900 | spark.sql.statistics.totalSize                        | 1738777345
  34900 | spark.sql.statistics.numRows                          | 39720020
  34900 | numFiles                                              | 0
  34900 | totalSize                                             | 0
  34900 | spark.sql.sources.schema.numParts                     | 1
  34900 | EXTERNAL                                              | TRUE
  34900 | spark.sql.create.version                              | 2.4.3
  34900 | spark.sql.sources.provider                            | delta
  34900 | transient_lastDdlTime                                 | 1583162364

The transient_lastDdlTime represents the last updated statistics timestamp. It can be retrieved with:

select to_timestamp(cast(1583162364 as bigint))::timestamp

    to_timestamp
---------------------
 2020-03-02 15:19:24

Joins

good article about joins

Range join

efficient range join The idea is to bucket both columns and add this join condition.

Register delta table in hive

you can simply save the table like :

spark.write.format("delta").saveAsTable("the_table")

you can also use an external table :

spark.range(10).write.format("delta").save("/tmp/events")
spark.sql("create table events using delta location '/tmp/events'")
spark.sql("select * from events").show(100)

It is also possible to directly write a delta table into the metastore:

df.write.format("delta").saveAsTable("myDeltaTable")

Partitionning

This will create a folder per enumerated column content. Later, any filter (in, =, rlike) will allow to skip partitions.

WARNING: The partition column shall not contain special spark column character such upper case, dot... Otherwize, the filter will throw an exception.

Bucketing

see

It has two benefits and one drawback. The drawback is it needs to shuffle and sort the data before writing it, hence increasing a lot the creation of the table. The two benefits are:

Best is when both tables are bucketed (again, same bucket number and same bucket column).

It is also possible to join a single bucketed table to a on the fly repartitionned table. The latter has to be repartitioned into the same number bucket and on the same column.

table1.repartition(100) // 20 * 100 parquet files
  .write.format("parquet")
  .bucketBy(20, "encounter") // 20 buckets
  .sortBy("encounter") // sorted
  .saveAsTable("bucketTable")

val notBucketTable = table2
  .repartition(20, $"encounter")
  .sortWithinPartitions("encounter")

spark.table("bucketTable")
  .join(notBucketTable, Seq("encounter"))

Partitionning + Bucketing

It is possible to use both partitionnning and bucketing.

table1.repartition(100).write.mode("overwrite").partitionBy("typesimple").bucketBy(20, "encounter").sortBy("encounter").saveAsTable("edsomop.encounterAphp")

Spark Streaming

A streaming context can be created from a sparkContext:

import org.apache.spark.streaming.{Seconds, StreamingContext}

val ssc = new StreamingContext(spark.sparkContext, Seconds(20))
val lines = ssc.textFileStream("hdfs:///user/nparis/streamingRepository/*")
lines.foreachRDD { rdd => {
  val df = rdd.toDF; df.show
}
}
ssc.start

The textFileStream gets the texts files within a given repository, and produces a RDD with a row per files rows.

A DStream object, supports transformations such filter,count, or map which returns back a new DStream.

The streaming process can only begin when an "output operation" happens on the stream, such print, foreachRDD, saveAsTextFile...

Structured streaming

scaladoc of structured streaming wikibooks

Structured Streaming allows spark streaming to manipulate structured input/output such parquet, csv, json or delta. It can also handle any output datasource thanks to writeStream.foreachand foreachBatch methods.

// schema is necessary
val schema = spark.read.json("structuredStreaming").schema
val c = spark.readStream.schema(schema).json("structuredStreaming")
// checkpointing is necessary
spark.conf.set("spark.sql.streaming.checkpointLocation", "structuredStreamingCheckpoint")
c.writeStream.format("delta").option("path", "structuredStreamingDelta").start
// this alternative allows to apply any thing on the stream
c.writeStream.foreachBatch((b, i) => {
  println(i); b.show
}).start

Each batch add a new parquet file with the content.

delta structured streaming

val in = spark.readStream.format("delta").option("ignoreChanges", "true").option("path", "structuredStreamingDelta").load

// update the input table with an other process
DeltaTable.forPath("structuredStreamingDelta").updateExpr("query = '91400+11457491401+54+rue+alphonse+daudet'", Map("query" -> "'daudet'"))

// this will resend all the data of every parquet file containing the updated row.

In case of delta source, it does not handle delete. Hewever it handles updates: every parquet files which are rewritten by the delta acid method are sent again to the streamWriter. This will cause duplicated record and they have to be handled by the datasource.

The main(?) advantage of using delta as a datasource for strucuted streaming vacuum processes allow to reduce the number of files that would increase batch after batch.

RDD Operations

pairRDD scaladoc: RDD transformations Consider the below RDD:

val keysWithValuesList = Array("foo=A", "foo=A", "foo=A", "foo=A", "foo=B", "bar=C", "bar=D", "bar=D")
val data = sc.parallelize(keysWithValuesList)

groupByKey

data
  .flatMap(line => line.split("="))
  .map(word => (word, 1))
  .groupByKey()
  .mapValues(_.sum)
+---+---+
| _1 | _2 |
+---+---+
| bar |
3 |
  | A |
4 |
  | B |
1 |
  | foo |
5 |
  | C |
1 |
  | D |
2 |
  +---+---+

reduceByKey

data
  .flatMap(line => line.split("="))
  .map(word => (word, 1))
  .reduceByKey((x, y) => (x + y))
+---+---+
| _1 | _2 |
+---+---+
| bar |
3 |
  | A |
4 |
  | B |
1 |
  | foo |
5 |
  | C |
1 |
  | D |
2 |
  +---+---+

aggregateByKey

def
aggregateByKey[U](zeroValue: U)(seqOp: (U, V) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): RDD[(K, U)]

Aggregate the values of each key
, using given combine functions and a
  neutral
"zero value".This function can
return a different result
type, U
, than the
type of
the values in
this RDD
, V.Thus
, we need one
operation
for merging a V into a U and one operation
for merging two
  U
's
, as in scala.TraversableOnce.The former operation is used
for
  merging values within a partition
, and the latter is used
for merging
values between partitions.To avoid memory allocation
, both of these
functions are allowed to modify and
return their first argument
instead of creating a new U.
val kv = data.map(_.split("=")).map(v => (v(0), v(1))).cache()
val initialCount = 0;
val addToCounts = (n: Int, v: String) => n + 1
val sumPartitionCounts = (p1: Int, p2: Int) => p1 + p2
val countByKey = kv.aggregateByKey(initialCount)(addToCounts, sumPartitionCounts)
+---+---+
| _1 | _2 |
+---+---+
| bar |
3 |
  | foo |
5 |
  +---+---+

Catalyst

SparkSession

spark concurrent fast queries

Show session configs

spark.sql("set").show()

Partition a table on disk

This generates test data:

# pip3 install fastparquet pandas numpy
def generate_data(n, seed):
    import pandas as pd
    import numpy as np
    np.random.seed(seed)
    np_array = np.random.random_sample(n)  # generates an array of M random values
    df = pd.DataFrame(np_array, columns=["x"])
    df["seed"] = seed
    df.reset_index().to_parquet(f"/tmp/bucket/part-{str(seed).zfill(5)}.parquet")


df = spark.read.parquet("/tmp/bucket").withColumn("part", expr("case when x > 0.5 then 'bob' else  'jim' end"))

# this creates multiple json per partitions
df.repartition(1000, "part").write.format("json").mode("overwrite").partitionBy("part").save("/tmp/df")
# this creates one json per partition
df.repartition(1000, "part").write.format("json").mode("overwrite").partitionBy("part").save("/tmp/df")

Spark Plan

The spark plans can be accessed from the QueryExection object.

see

Generate data

   val df = spark.range(0, 2000).selectExpr(
        "id as _1",
        "cast(id % 10 as byte) as _2",
        "cast(id % 10 as short) as _3",
        "cast(id % 10 as int) as _4",
        "cast(id % 10 as float) as _5",
        "cast(id % 10 as double) as _6",
        "cast(id % 10 as decimal(20,0)) as _7",
        "cast(id % 2 as boolean) as _8",
        "cast(cast(1618161925000 + (id % 10) * 1000 * 60 * 60 * 24 as timestamp) as date) as _9",
        "cast(1618161925000 + (id % 10) as timestamp) as _10"
      )

Estimate dataframe size in bytes

import org.apache.spark.util.SizeEstimator
SizeEstimator.estimate(b)

Cache

Columnar cache:

spark.catalog.cacheTable("tableName") or dataFrame.cache()
spark.catalog.uncacheTable("tableName") or dataFrame.unpersist()

Transform uuid as long

There is no collision :

def stringToUUID(uuid: String): Long = {
  return java.util.UUID.fromString(uuid).getMostSignificantBits() & java.lang.Long.MAX_VALUE
}
val stringToUUIDUdf = udf((uuid: String) => stringToUUID(uuid))
d.withColumn("user_id_long", stringToUUIDUdf(expr("user_id"))).show

ordored uuid

Read partitionned table

spark.read
  .option("basePath", "/tmp/my_table") // will get field date
  .option("spark.sql.sources.partitionColumnTypeInference.enabled", "false") // won't infer date as date type
  .format("parquet")
  .read("/tmp/my_table/date=2022-01-01/*.parquet")

tuning

videos

Spark datasources

Datasource v1

  1. BaseRelation: abstraction of a dataframe loaded from a datasource, provides the schema of the data
  2. RelationProvider: handle user's options and create a baseRelation
  3. TableScan: read the data, and construct the rows

Datasource v2

udf java

sqlContext.udf()
        .register("strlen",
        (String s)->s.length(),DataTypes.StringType);

Memory

GC

kryo

scala > sc.getConf.registerKryoClasses(Array(classOf[com.
infoobjects.CustomClass1], classOf[com.infoobjects.CustomClass2])
.set(“spark.kryoserializer.buffer.max”, “128m”)  
.set(“spark.kryoserializer.buffer”, “64m”)

Vocabulary

web ui

![[Screenshot_20230501-231933_NewPipe.png]]

![[Screenshot_20230501-222826_NewPipe.png]]

optimization

Dates

Skew joins

Joins explained

Spark infer partition type

see spark.sql.sources.partitionColumnTypeInference.enabled

Logging

add -Dlog4j.configuration=log4j.xml to spark.executor.extraJavaOptions 
so the executors pick it up.

Bucketing

Spark bucketing in hive metastore:

spark.sql.sources.schema.bucketCol.0   | tbl_id
spark.sql.sources.provider             | parquet
numFiles                               | 2
spark.sql.sources.schema.numBuckets    | 4
spark.sql.create.version               | 3.2.3
spark.sql.sources.schema               | {"type":"struct","fields":[{"name":"tbl_id","type":"long","nullable":true,"metadata":{}},{"name":"kafka_datetime","type":"timestamp","nullable":true,"metadata":{}},{"name":"event_timesta
:"timestamp","nullable":true,"metadata":{}},{"name":"version","type":"string","nullable":true,"metadata":{}},{"name":"event_date","type":"string","nullable":true,"metadata":{}},{"name":"event_hour","type":"string","nullable":true
":{}}]}
totalSize                              | 3824
spark.sql.sources.schema.sortCol.0     | event_timestamp
spark.sql.sources.schema.numBucketCols | 1
spark.sql.sources.schema.numSortCols   | 1
transient_lastDdlTime                  | 1683402911

Spark parquet

Alluxio

Spark parquet inspection

pip3 install ipython
parquet-tools inspect 

streaming

spark listener analyser

Spark uber optim

graph

Metastore configuration

spark s3 commiter

stage barrier

shuffle

native function

Options

Options precedence:

Properties set directly on the SparkConf take highest precedence, then flags passed to spark-submit or spark-shell, then options in the spark-defaults.conf file.

kylin spark backend

connectors

hive

Parquet integration

Spark case sensitive

kubernetes

-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

spark.executor.memoryOverhead

spark.memory.offHeap.enabled": "true"

"spark.kubernetes.memoryOverheadFactor": "0.2"

About xms on Tarn ans k8s: https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/issues/781#issuecomment-1250345153

Profiling

If you are planning to run Spark for a long time on a cluster, you may wish to enable spark.cleaner.ttl. By default, Spark does not clean up any metadata (stages generated, tasks generated, and so on); set this to a non-zero value in seconds to clean up the metadata after that length of time.

  1. Overhead memory -> spark.executor.memoryOverhead
  2. Heap Memory -> spark.executor.memory
  3. Off Heap Memory -> spark.memory.offHeap.size
  4. Pyspark Memory -> spark.executor.pyspark.memory
Off-heap memory is a great way to reduce GC pauses because it's not in the GC's scope. However, it brings an overhead of
serialization and deserialization. The latter in its turn makes that the off-heap data can be sometimes put onto heap
memory and hence be exposed to GC. Also, the new data format brought by Project Tungsten (array of bytes) helps to
reduce the GC overhead. These 2 reasons make that the use of off-heap memory in Apache Spark applications should be
carefully planned and, especially, tested.

file path

One can access file path from _metadata field or the dedicated function. See See

React ?

This page was last modified: