Hello,
I implemented an Apache Kafka 0.8 protocol support using Gatling 2.1.3:
However, the graph in an HTML report shows that active users grow even when I use constantUsersPerSec as the attached image file
though the number of threads in a java process doesn’t grow so much during a stress test.
The simulation file I used is as follows:
package com.github.mnogu.gatling.kafka.test
import io.gatling.core.Predef._
import org.apache.kafka.clients.producer.ProducerConfig
import scala.concurrent.duration._
import com.github.mnogu.gatling.kafka.Predef._
class BasicSimulation extends Simulation {
val kafkaConf = kafka
.topic("test")
.properties(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092"))
val scn = scenario("Kafka Test")
.exec(kafka("request").send("foo"))
// You can also use feeder
//
This file has been truncated. show original
And the code to send requests to Apache Kafka 0.8 is as follows:
package com.github.mnogu.gatling.kafka.action
import akka.actor.ActorRef
import com.github.mnogu.gatling.kafka.config.KafkaProtocol
import com.github.mnogu.gatling.kafka.request.builder.KafkaAttributes
import io.gatling.core.action.{Failable, Interruptable}
import io.gatling.core.result.message.{KO, OK}
import io.gatling.core.result.writer.DataWriterClient
import io.gatling.core.session._
import io.gatling.core.util.TimeHelper.nowMillis
import io.gatling.core.validation.Validation
import org.apache.kafka.clients.producer._
import scala.collection.JavaConverters._
object KafkaRequestAction extends DataWriterClient {
def reportUnbuildableRequest(
requestName: String,
session: Session,
This file has been truncated. show original
Am I doing something wrong? If so, how to fix bugs in my code?
Thanks in advance.
Hi,
Your issue is that you’re blocking on the send Futures: https://github.com/mnogu/gatling-kafka/blob/master/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala#L61
As a results, you have a thread starvation, and users pile up in the starting queue without getting a chance to run.
As KafkaProducer.send returns regular Futures and not CompletableFutures so you can react on completion instead of blocking, you should use instead the version that takes Callback:
https://github.com/apache/kafka/blob/0.8.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L212
Get it?
Cheers,
The number of active users still grows with a callback, but the growth rate becomes smaller:
session: Session): Validation[Unit] = {
payload(session).map { resolvedPayload =>
val producer = new KafkaProducer(kafkaProtocol.properties.asJava)
val record = new ProducerRecord(
kafkaProtocol.topic, resolvedPayload.getBytes)
val requestStartDate = nowMillis
val requestEndDate = nowMillis
// send the request
producer.send(record, new Callback() {
override def onCompletion(m: RecordMetadata, e: Exception): Unit = {
val responseStartDate = nowMillis
val responseEndDate = nowMillis
// log the outcome
writeRequestData(
session,
requestName,
requestStartDate,
2015年1月29日木曜日 9時24分13秒 UTC+9 Stéphane Landelle:
Are you sure creating and closing a producer on each event is the proper pattern? Is this how Kafka is supposed to be used? Shouldn’t such component be reused and shared?