LEAK: ByteBuf.release()

I have a websocket simulation using gatling 3 that spins up 2 concurrent groups of 5 user sockets each – each one subscribes to a server that then starts sending messages at a rapid rate – the check is in an AsLongAs loop that resends an empty string to re-init the check to listen to more messages. I’m trying to confirm that each socket gets all messages it should in terms of sheer count. In this case, each user should receive 100 messages within ~40 seconds. Instead, at roughly 30 seconds in, I get this error:

146: 20:42:59.996 [ERROR] i.n.u.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it’s garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
147: Recent access records:
148: Created at:
149: io.netty.buffer.PooledByteBufAllocator.newHeapBuffer(PooledByteBufAllocator.java:314)
150: io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:166)
151: io.netty.buffer.AbstractByteBufAllocator.heapBuffer(AbstractByteBufAllocator.java:157)
152: io.netty.handler.codec.compression.JdkZlibDecoder.decode(JdkZlibDecoder.java:180)
153: io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
154: io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
155: io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
156: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
157: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
158: io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
159: io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
160: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
161: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
162: io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
163: io.netty.channel.embedded.EmbeddedChannel.writeInbound(EmbeddedChannel.java:325)
164: io.netty.handler.codec.http.websocketx.extensions.compression.DeflateDecoder.decode(DeflateDecoder.java:68)
165: io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateDecoder.decode(PerMessageDeflateDecoder.java:64)
166: io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateDecoder.decode(PerMessageDeflateDecoder.java:30)
167: io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
168: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
169: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
170: io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
171: io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
172: io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
173: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
174: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
175: io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
176: io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
177: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
178: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
179: io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
180: io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
181: io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
182: io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
183: io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
184: io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
185: io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
186: io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
187: java.base/java.lang.Thread.run(Thread.java:844)
188: 20:43:00.003 [ERROR] i.n.u.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it’s garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
189: Recent access records:
190: Created at:
191: io.netty.buffer.AbstractByteBufAllocator.compositeDirectBuffer(AbstractByteBufAllocator.java:221)
192: io.netty.buffer.AbstractByteBufAllocator.compositeDirectBuffer(AbstractByteBufAllocator.java:216)
193: io.netty.buffer.AbstractByteBufAllocator.compositeBuffer(AbstractByteBufAllocator.java:191)
194: io.netty.handler.codec.http.websocketx.extensions.compression.DeflateDecoder.decode(DeflateDecoder.java:73)
195: io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateDecoder.decode(PerMessageDeflateDecoder.java:64)
196: io.netty.handler.codec.http.websocketx.extensions.compression.PerMessageDeflateDecoder.decode(PerMessageDeflateDecoder.java:30)
197: io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:88)
198: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
199: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
200: io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
201: io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
202: io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
203: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
204: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
205: io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
206: io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)
207: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
208: io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
209: io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)
210: io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
211: io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
212: io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
213: io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
214: io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
215: io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:897)
216: io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
217: java.base/java.lang.Thread.run(Thread.java:844)

I’m using a single check for all sockets (var check = checkLogic) and using that same “check” variable for all – so it acts as an accumulator that reflects the total number of messages received by all user sockets. So in my case, I’m expecting the total to be 1000 ultimately, but it’s always below, typically between 800-980 – and the error is always thrown. The check logic is pretty basic:

val latest = exec(ws(\"Connect\").connect(\"/ws/v1/subscribe\"))
    .feed(feeder)
    .exec(
        ws(\"Latest Subscribe\")
            .sendText(session => {
                // Subscription JSON text
            })
    )
    .exec(
        exitBlockOnFail {
            asLongAs(session => session(\"expectedTotal\").as[Int].>(session(\"totalPackets\").as[Int]), \"i\") {
                exec(ws(\"Send Blank\").sendText(\"\").await(30 seconds)(latestCheck))
            }
        }
    )
    .exec(ws(\"Disconnect\").close);

This seems like fairly light usage so I’d be surprised if it’s a performance issue in gatling. However, I need to determine if:
a. gatling 3’s socket implementation is missing messages somewhere/somehow
b. the error is causing the drops, and how I can avoid it
c. the under 1k number is actually how many messages are making it to the sockets ( i.e. nothing wrong with Gatling :slight_smile: )

Thanks
Mike

Thanks for reporting!

https://github.com/gatling/gatling/issues/3626

No problem! Can you recommend a workaround for this use case? I just need to be able to count incoming messages on the sockets using the check functionality…

You can only wait until you’ve received an expected number of messages.

Oh is .until() supported in the 3 websocket API? didn’t see it in the docs

I noticed it looks like you put in a fix for this (https://github.com/gatling/gatling/commit/203658e00111ea8c2b279b53d9aad0ed76f34cfe) – is there an easy way to drop this change into my 3.0 setup? snapshot jar, etc ?

Easiest way is to install sbt and build from sources :slight_smile:

Is .await() supported in 3 ?

also – built the snapshot jars locally and moved them into my gatling3/lib folder – I see in gatling.sh that it places all jars in /lib in the classpath – how do I tell the engine to use the SNAPSHOT-3.0.2 jars instead of the 3.0.0 jars ?

Dropped the snapshot files into the lib directory and removed their corresponding 3.0.0 jars – runs locally but .close is never called – and then when I run it deployed in my Kubernetes cluster the simulation crashes with this error:

414: 21:56:57.614 [ERROR] i.g.c.a.SingletonFeed - Feed failed: Feeder crashed: j.u.NoSuchElementException: None.get, please report.
415: 21:56:57.616 [ERROR] i.g.c.c.Controller - Simulation crashed
416: java.lang.IllegalStateException: Feeder crashed: j.u.NoSuchElementException: None.get
417: at io.gatling.core.action.SingletonFeed$$anonfun$receive$1.applyOrElse(SingletonFeed.scala:63)
418: at akka.actor.Actor.aroundReceive(Actor.scala:517)
419: at akka.actor.Actor.aroundReceive$(Actor.scala:515)
420: at io.gatling.core.akka.BaseActor.aroundReceive(BaseActor.scala:24)
421: at akka.actor.ActorCell.receiveMessage(ActorCell.scala:588)
422: at akka.actor.ActorCell.invoke(ActorCell.scala:557)
423: at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
424: at akka.dispatch.Mailbox.run(Mailbox.scala:225)
425: at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
426: at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
427: at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
428: at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
429: at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Looks like the feeder is breaking – strange it’s only happening in the cluster, although it is Linux and my local is a Mac – thoughts?

OK solveld that error but the .close is not being called still – this is the code:

val latestCheck = ws.checkTextMessage("Count of Latest Subscription Messages Received")
    .check(jsonPath("$..reqId").exists)

// GROUP 1
val mock1 = exec(ws("Connect").connect("/ws/v1/subscribe"))
    .feed(feeder)
    // SOCKET CONNECTION ESTABLISHED --> ADD CHECKS
    .exec(ws("Subscribe")
        // 1 LATEST SUBSCRIPTION
        .sendText(session => {
            new UserRequest("latest_subscribe", session("reqId").as[Int], session("startTime").as[Int], session("schemaKey").as[String], session("lastValues").as[Boolean], session("authToken").as[String], "mock1", "[]").generateLatestRequest()
        })
    )
    .exec(
        exitBlockOnFail {
            asLongAs(session => session("expectedTotal").as[Int].>(session("totalPackets").as[Int]), "i") {
                exec(ws("Send Blank").sendText("").await(60 seconds)(latestCheck))
            }
        }
    )
    .exec(ws("Disconnect").close)

Before adding the new snapshot files it would timeout on the latestcheck once there was no data coming and close out – now the timeout is hit, the failures are logged, but the test keeps running – wondering if it’s something to do with the loop?

Please provide a minimal sample (simulation and sample app) so we can reproduce on our side.