Kafka
→ Kafka streaming
- Documentation
- State store aka ktable can be tuned
- how rocksdb works
→ About rocksdb
- forked as CockroachDB / yugabyte
- forked from leveldb
- maintened by meta
- written in c++, biding in java, rust, go
- embeddable database
- key value pair, more exacly bytes array pairs
- ge/put/merge/delete also data iterator for scan
- LSM-Tree
- memtable to get the incoming data until 64MO limit reach
- WAL (Write Ahead Logs) to avoid data lost during crash
- SST (Static Sorted Table) can be compressed (snappy, zstd, gzip...)
- Offset Index map (to allow binary search in compressed file)
- optional Bloom filter : to make lookup keys don't exist faster
- space/read amplification: each flush to disk adds files to be merged
- compaction creates a new level
- compaction can cascade accros levels
- k-way merge strategy (to merge multiple level of files)
- reads traverse the whole level (memtables -> all level 0 -> target levels N -> bloom/index + read the block)
- merge operation (=read-modified-write): thread safe read+put+DELETE
→ Kafka connect
→ secrets
There is a mechanism to provide/implement secrets in kafka-connect:
We could easily provide a EnvConfigProvider which look into linux env variables for values.
→ s3 connector
→ Kafka auth
→ Kafka Streaming
→ Topology
import org.apache.kafka.streams.StreamsBuilder;
final StreamsBuilder builder = new StreamsBuilder();
// [...]
builder.topology.describe();
→ Avro
→ resources
In particular:
def read(bytes: Array[Byte],
schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
val datumReader = new GenericDatumReader[GenericRecord](schema)
val inputStream = new SeekableByteArrayInput(bytes)
val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader)
import scala.collection.JavaConverters._
val list = dataFileReader.iterator().asScala.toList
dataFileReader.close()
list
}
def readBinary(bytes: Array[Byte],
schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
val datumReader = new GenericDatumReader[GenericRecord](schema)
val inputStream = new SeekableByteArrayInput(bytes)
val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)
val result = new mutable.MutableList[org.apache.avro.generic.GenericRecord]
while (!decoder.isEnd) {
val item = datumReader.read(null, decoder)
result += item
}
result.toList
}
→ type promotion
Avro promotion type support (from specs) :
int is promotable to long, float, or double
long is promotable to float or double
float is promotable to double
string is promotable to bytes
bytes is promotable to string
→ avro-Parquet
- parquet struct derive from avro root field of type record
- a record has a name (
record_name
), a namespace, a doc, a list of fields - when the avro root field is of type union
[null, record]
orrecord
then the resulting parquet struct will haveroot<field1, field2>
and notroot<record_name<field1,field2>>
- when the avro root field is of type union
[record, any_other_but_null]
then the resulting parquet struct will haveroot<member0<field1,field2>, member1<...>>
and notroot<field1, field2>
→ Spark kafka
→ producers
→ Kafka transactions
For 1 KB messages and transactions lasting 100 ms, the producer throughput declines only by 3%, compared to the throughput of a producer configured for at least once, in-order delivery (acks=all, max.in.flight.requests.per.connection=1), and by 20% compared to the throughput of a producer configured for most once delivery with no ordering guarantees (acks=1, max.in.flight.requests.per.connection=5), which is the current default. Smaller messages or shorter transaction commit intervals would result in more severe degradation.
the transactional consumer shows no degradation in throughput when reading transactional messages in read_committed mode
the main tradeoff when increasing the transaction duration is that it increases end-to-end latency. Recall that a consumer reading transactional messages will not deliver messages which are part of open transactions
a Kafka topic partition might have some messages that are part of a transaction while others are not.
→ idempotency
Case covered by idempotency:
The producer.send() could result in duplicate writes of message B due to internal retries. This is addressed by the idempotent producer and is not the focus of the rest of this post.
enabling idempotence requires
max.in.flight.requests.per.connection
to be less than or equal to 5 (with message ordering preserved for any allowable value),retries
to be greater than 0, andacks
must be 'all'.
Case covered by transactions:
Reprocessing may happen if the stream processing application crashes after writing B but before marking A as consumed. Thus when it resumes, it will consume A again and write B again, causing a duplicate.
in distributed environments, applications will crash or—worse!—temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost called “zombie instances.”