Kafka

Kafka streaming

About rocksdb

Kafka connect

secrets

There is a mechanism to provide/implement secrets in kafka-connect:

We could easily provide a EnvConfigProvider which look into linux env variables for values.

s3 connector

Kafka auth

Kafka Streaming

Topology

import org.apache.kafka.streams.StreamsBuilder;

final StreamsBuilder builder = new StreamsBuilder();
// [...]
builder.topology.describe();

Avro

resources

In particular:

def read(bytes: Array[Byte],
           schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
    import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
    import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val inputStream = new SeekableByteArrayInput(bytes)
    val dataFileReader = new DataFileReader[GenericRecord](inputStream, datumReader)

    import scala.collection.JavaConverters._
    val list = dataFileReader.iterator().asScala.toList

    dataFileReader.close()

    list
  }
  
  def readBinary(bytes: Array[Byte],
           schema: org.apache.avro.Schema): List[org.apache.avro.generic.GenericRecord] = {
    import org.apache.avro.file.{DataFileReader, SeekableByteArrayInput}
    import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

    val datumReader = new GenericDatumReader[GenericRecord](schema)
    val inputStream = new SeekableByteArrayInput(bytes)
    val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)

    val result = new mutable.MutableList[org.apache.avro.generic.GenericRecord]
    while (!decoder.isEnd) {
      val item = datumReader.read(null, decoder)

      result += item
    }

    result.toList
  }

type promotion

Avro promotion type support (from specs) :

    int is promotable to long, float, or double
    long is promotable to float or double
    float is promotable to double
    string is promotable to bytes
    bytes is promotable to string

avro-Parquet

Spark kafka

producers

Kafka transactions

transaction blog exactly once

For 1 KB messages and transactions lasting 100 ms, the producer throughput declines only by 3%, compared to the throughput of a producer configured for at least once, in-order delivery (acks=all, max.in.flight.requests.per.connection=1), and by 20% compared to the throughput of a producer configured for most once delivery with no ordering guarantees (acks=1, max.in.flight.requests.per.connection=5), which is the current default. Smaller messages or shorter transaction commit intervals would result in more severe degradation.

the transactional consumer shows no degradation in throughput when reading transactional messages in read_committed mode

the main tradeoff when increasing the transaction duration is that it increases end-to-end latency. Recall that a consumer reading transactional messages will not deliver messages which are part of open transactions

a Kafka topic partition might have some messages that are part of a transaction while others are not.

idempotency

Case covered by idempotency:

The producer.send() could result in duplicate writes of message B due to internal retries. This is addressed by the idempotent producer and is not the focus of the rest of this post.

enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 (with message ordering preserved for any allowable value), retries to be greater than 0, and acks must be 'all'.

Case covered by transactions:

Reprocessing may happen if the stream processing application crashes after writing B but before marking A as consumed. Thus when it resumes, it will consume A again and write B again, causing a duplicate.

in distributed environments, applications will crash or—worse!—temporarily lose connectivity to the rest of the system. Typically, new instances are automatically started to replace the ones which were deemed lost called “zombie instances.”

consumers

React ?

This page was last modified: