Hello,
I’m new to Gatling, and I am interested by the built-in support of SSE. I managed to write a basic test.
Now I would like to write a test to verify how many clients can be connected to the endpoint at the same time, and be sure that they correctly receive an event when it is generated on server side.
The scenario I’m looking for is:
- progressively connect as many client as possible to the SSE endpoint (1K, 10K, maybe more if possible)
- once all clients are connected and waiting for event, call a REST API only once to generate one single event (this event should be broadcasted to all clients)
- assert that each client correctly received the event, ideally checking the timing
- close all see connections
I managed to use the rendezVous function to wait for all my clients to be connected. Now I would like to know how to run this single http request to generate the event.
class SseSimulation extends Simulation {
val httpProtocol = http
.baseUrl(“http://localhost:9000”)
.acceptHeader(“text/event-stream”)
val userCount = 1000
val scn = scenario(“SSEConnect”)
.exec(sse(“Connect”)
.connect("/api/streamEvents")
)
.rendezVous(userCount)
// HERE I would like to have only one HTTP call to /api/generateEvent
.exec(sse(“Check Event”).setCheck
.await(30.seconds)(
sse
.checkMessage(“Check event”)
.check(regex("""{“data”:"(.*)"}"""))
)
)
.exec(sse(“Close”).close)
setUp(scn.inject(rampUsers(userCount).during(10.seconds)).protocols(httpProtocol))
}
I see two problems:
- how to execute the http request once, and not one time per user?
- how to avoid the race condition, where the event can be missed because the check is registered after calling the /api/generateEvent ?
Hello!
Welcome to the gatling community!
As I understand your example, you have 2 different kinds of users:
- your N users that want to see something on the SSE channel
- one user that will call your REST API to generate the event
For me, different kinds of users should be coded as 2 different scenarios.
class SseSimulation extends Simulation {
val httpProtocol = http
.baseUrl("[http://localhost:9000](http://localhost:9000)")
.acceptHeader("text/event-stream")
val userCount = 1000
val scnListener = scenario("SSEConnect")
.exec(sse("Connect")
.connect("/api/streamEvents")
)
.rendezVous(userCount)
.await(30.seconds)(
sse
.checkMessage("Check event")
.check(regex("""\{"data":"(.*)"\}"""))
)
)
.exec(sse("Close").close)
val scnSender = scenario("SseSender")
.exec(http("My call to API")
.post("/api/send/event/endpoint")
.body(StringBody("Call ${currentTimeMillis()}")))
)
setUp(
scnListener.inject(rampUsers(userCount).during(10.seconds)).protocols(httpProtocol),
scnSender.inject(nothingFor(15.seconds).atOnceUsers(1)).protocols(httpProtocol)
)
}
Note: I didn’t test the code myself
Note the currentTimeMillis to be able to check and compare time of sent event, and received event in your listener user.
Hoping it helps
Cheers!
Hi Sébastien,
I tried (with 10 users only) and for some reason the scnListener scenario look stuck, producing this kind of log in a loop.
---- SSEConnect ---------------------------------------------------------
[--------------------------------------------------------------------------] 0%
waiting: 0 / active: 10 / done: 0
---- SseSender ------------------------------------------------------------
[##########################################################################]100%
waiting: 0 / active: 0 / done: 1
Is there any way to reproduce on our side?
I found the issue, it was on our server side. We were not returning a response immediately after the connection, and as a result the clients were stuck on the connect step.
The next step would be to make the overall process more reliable. I’m not very fond of the hardcoded nothingFor(15.seconds) on the scnSender scenario. It doesn’t make it very safe. If clients take more time to connect and reach the rendezVous, they will loose the event. Are you aware of any way to synchronize the two scenarios?
Pseudo code:
val allSseClientsReady = new AtomicBoolean
val scnSender =
doWhile(session => !allSseClientsReady.get) {
pause(100.millis)
}.exec(sendEvent)
val scnListener =
exec(connect)
.rendezVous
.exec { session => allSseClientsReady.set(true); session }
.exec(setCheck)