Custom WS methods.

Hi,

I’m trying to implement a scenario where a user represents a replay of a number of messages on a websocket. The messages that the user sends to the websocket are stored in a number of files, one message per line, e.g.

foo.log

{"foo":"bar"}
...
{"foo":"box"}

There are five of such files, each is a different seed template for a given gatling session.

So I wrote some extensions to websocket classes.

The first adds a sendReplay method to the Ws class.

object WsOps {
  implicit class WsSendReplay(ws: Ws) {
    def sendReplay(replayId: Expression[String],
                   replacementId: Expression[String]):WsSendBuilder = {
      new EventReplaySendBuilder(
        replacementId,
        "replay",
        _ => Success(TextMessage("")), // unused
        None, // unused
        replayId,
        replacementId
      )
    }
  }
}

The replayId is the filename for the template (foo for the example shown above). The replacementId is a random int that will replace the replayId when replaying the messages. Now calling ws("").sendReplay will give back a EventReplaySendBuilder…

class EventReplaySendBuilder(
                            requestName: Expression[String],
                            wsName: String,
                            message: Expression[WsMessage],
                            checkBuilder: Option[AsyncCheckBuilder],
                            replayEventId: Expression[String],
                            replacementId: Expression[String]
                            ) extends WsSendBuilder(requestName, wsName, message, checkBuilder) {

  private val log = LoggerFactory.getLogger(getClass)
  override def check(checkBuilder: AsyncCheckBuilder): WsSendBuilder = {
    new EventReplaySendBuilder(requestName, wsName, message, Some(checkBuilder), replayEventId, replacementId)
  }

  override def build(ctx: ScenarioContext, next: Action): Action = {
    new EventReplaySend(requestName, wsName, message, checkBuilder,
      ctx.coreComponents.statsEngine, next, replayEventId, replacementId)
  }
}

The next class extends WsSendBuilder


class EventReplaySend(
   override val requestName: Expression[String],
   wsName:          String,
   message:         Expression[WsMessage],
   checkBuilder:    Option[AsyncCheckBuilder],
   statsEngine:     StatsEngine,
   override val next:        Action,
   replayEventId: Expression[String],
   replacementId: Expression[String]
) extends WsSend(requestName, wsName, message, checkBuilder, statsEngine, next) {

  private val log = LoggerFactory.getLogger(getClass)

  def eventReplay(sourceId: String, newId: String): List[(String, Long)] = {
    val sourceFile = Paths.get(GatlingFiles.dataDirectory.toAbsolutePath.toString, "itf", sourceId)
    log.warn(s"event replay $sourceId $newId ${GatlingFiles.dataDirectory.toAbsolutePath.toString}")
    val replayLines = Source.fromFile(sourceFile.toUri).getLines().toList
    val first = replayLines.head
    val second = replayLines.tail.head
    replayLines.drop(1).zip(replayLines.drop(2)).foldLeft(
      List((first, timeDifference(first, second)))
    ) { case (messageChain, (last, nextMessage)) =>
      (nextMessage.replaceAllLiterally(sourceId, newId), timeDifference(last, nextMessage)) :: messageChain
    }.reverse
  }

  override def sendRequest(requestName: String, session: Session): Validation[Unit] = {
    log.debug(s"Send request called $requestName")
    for {
      wsActor <- fetchActor(wsName, session)
      messageSource <- replayEventId(session)
      replacementId <- replacementId(session)
    } yield {

So from playing around with the sendRequest method I implemented, when I comment the Thread.sleep all the messages from my replay are published.

So it seems the answer to my first question is yes, it is a stupid thing to do.

Do any of the team have any idea what is causing this?

When the Thread.sleep is not commented I see the following message in the logs:

12:42:14.069 [DEBUG] o.a.n.r.NettyRequestSender - Unable to recover future NettyResponseFuture{currentRetry=1,
isDone=1,
isCancelled=0,
asyncHandler=org.asynchttpclient.AsyncCompletionHandlerBase@304c3d28,
nettyRequest=org.asynchttpclient.netty.request.NettyRequest@754eb545,
future=java.util.concurrent.CompletableFuture@6d502b22[Completed exceptionally],
uri=http://gatling.io,
keepAlive=true,
redirectCount=0,
timeoutsHolder=null,
inAuth=0,
statusReceived=0,
touch=1502970123977}

It seems like it could be related?

So most of my code was totally unnecessary and I totally misunderstood the gatling way of doing things. The answer to my question “Is this a stupid thing to do?” is definitely yes.

Instead of all of that code, I just use a exec to set my replay messages to be a value in the session and then use foreach to send each step.