Kafka – Simple Producer / Consumer Example

Last time we discussed Kafka in general. This time let’s write some code. Here is a quick tutorial how to create simple Scala Producer/Consumer scenario.

Dependencies

In build.sbt we will add following dependencies:

libraryDependencies ++= Seq(
  "org.apache.kafka"%"kafka-clients"%"0.10.2.1",
  "ch.qos.logback"%"logback-classic"%"1.0.13",
  "ch.qos.logback"%"logback-core"%"1.0.13",
)

Nothing fancy here – besides dependencies to Kafka, there are also Logback deps to be able to see Kafka logs in console.

Simple producer

In order to create consumer we need to prepare configuration. There are lots of things that we can configure, but only three of them are mandatory:

  • bootstrap.servers – it is a list of kafka brokers that we will use for initalization of connection to kafka cluster. The idea is we don’t need to know all the addresses – during initalization we will conect to brokers from the list and we will obtain other addresses. It is still better to have more than one address here since there is a risk that during connection start some of brokers in cluster will be unavailable
  • key.serializer – name of the class that will serialize keys of our messages
  • value.serializer – name of the class that will serialize our messages

Creation of producer will look like this:

valproducerProperties = newProperties()
producerProperties.put("bootstrap.servers","127.0.0.1:9092")
producerProperties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer")
producerProperties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer")

valproducer = newKafkaProducer[String,String](producerProperties)

In order to send message to kafka we will need to create instance of ProducerRecord:

val record = newProducerRecord[String,String]("test","key","value")

First argument is the name of topic that we want write to. Second one is our key and the last is a message itself.

There are couple of styles of sending messages to Kafka, but since today we are writing simple “Hello World” application we will not discuss it – we will use the most simple approach:

producer.send(record)

When we are no longer interested with sending messages to Kafka we can close producer:

producer.close()

Simple consumer

Creation of consumer looks similar to creation of producer. Again we have three mandatory configuration properties to pass:

  • bootstap.servers – it is exactly the same value as for producer
  • key.deserializer – class that will deserialize keys of messages for us
  • value.deserializer – class that will deserialize messages for us

It is quite obvious but still worth to mention – producer’s serializer should match consumer’s deserializer.

There is one more property that is not mandatory but will be set very often: group.id – which specifies to which group this consumer should join.

valconsumerProperties = newProperties()
consumerProperties.put("bootstrap.servers","127.0.0.1:9092")
consumerProperties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
consumerProperties.put("group.id","dummy-group")

val consumer:KafkaConsumer[String,String] = new KafkaConsumer[String,String](consumerProperties)

Now it is a good time to subscribe to some topic:

consumer.subscribe(Collections.singletonList("test"))

Now it is time to read some messages. Usually we would like to have loop – and inside of this loop we would like to read messages and process it somehow.

while(true){
  val records = consumer.poll(100)
}

Poll method takes an argument that is a maxium timeout for consumer to wait for messages. After that time, consumer will return what is available on broker (it can be of course nothing). There are other configurations that will influence how messages are read from Kafka but I will not cover them today.

Having records we can iterate over them and try to do something – in our example we will just print all fields from ConsumerRecord instance

for(record:ConsumerRecord[String, String] <- records){
  println
  println("checksum" + record.checksum)
  println("key" + record.key)
  println("offset" + record.offset)
  println("partition" + record.partition)
  println("serializedKeySize" + record.serializedKeySize)
  println("serializedValueSize" + record.serializedValueSize)
  println("timestamp" + record.timestamp)
  println("timestampType" + record.timestampType)
  println("topic" + record.topic)
  println("value" + record.value)
}

(in order to make that code works we should import scala.collection.JavaConversions._ )

At the end we would like to close consumer:

consumer.close()

Proper closing of consumer is important because it allows to split partitions between rest of consumers in one consumer group immediately. If we do not do that – we need to wait first to timeout connection of this consumer (which cause both delay, and not-reading from some partition in topic for some time) – when timeout is reached reassignment of partitions will start.

Subject of proper closing of consumer in various scenarios is big enough to write about it some other day.

But besides that – we have our simple Kafka “Hello World” in Scala.

Yay! ;)

Source code containing this example: https://github.com/mprzybylak/kafka-minefield