According to the Gatling documentation when http/2 scenario starts connections should be opened like in http/1 scenario: multiple connections for multiple requests. Unused connections should be closed in http.pooledConnectionIdleTimeout when remote server uses http2 protocol.
In my test nothing was happen: connections count did not decrease at all.
Tested Gatling versions: 3.9.5, 3.10.3
Here is the simple way to reproduce:
For Gatling
class LocalHttp2Simulation extends Simulation {
private val injectionSteps = Seq(rampConcurrentUsers(0) to (4) during (1000 seconds),
constantConcurrentUsers(4) during (1000 seconds)
)
val localHttp2Request: ChainBuilder = {
exec(
http("LocalHttp2Request")
.get("/")
.body(StringBody("BODY"))
)
private val scenario = scenario("LocalHttp2Scenario")
.forever {
localHttp2Request
.pace(10 seconds)
}
setUp(
scenario.inject(injectionSteps)
.protocols(
httpProtocol(url = "https://127.0.0.1:443")
.maxConnectionsPerHost(10)
.connectionHeader("keep-alive")
.enableHttp2
)
)
}
And for http2 server (Kotlin):
object Http2Server {
private val logger: Logger = LoggerFactory.getLogger(Http2Server::class.java)
@Throws(Exception::class)
@JvmStatic
fun main(args: Array<String>) {
Security.addProvider(BouncyCastleProvider())
val channelCounter = AtomicInteger(0)
val sslCtx = createServerSslContext()
val group: EventLoopGroup = NioEventLoopGroup()
try {
val b = ServerBootstrap()
b.option(ChannelOption.SO_BACKLOG, 1024)
b.group(group)
.channel(NioServerSocketChannel::class.java)
.handler(LoggingHandler(LogLevel.INFO))
.childHandler(
object : ChannelInitializer<SocketChannel>() {
@Throws(Exception::class)
override fun initChannel(ch: SocketChannel) {
logger.info("Current connections count = [${channelCounter.incrementAndGet()}]")
ch.pipeline().addLast(sslCtx.newHandler(ch.alloc()), getServerAPNHandler(channelCounter))
}
}
)
val ch: Channel = b.bind(443)
.sync()
.channel()
logger.info("HTTP/2 Server is listening on https://127.0.0.1:443/")
ch.closeFuture()
.sync()
} finally {
group.shutdownGracefully()
}
}
}
object Http2Util {
private fun generateCertificate(kp: KeyPair): X509Certificate? {
val start = Instant.now()
val stop = start.plus(Duration.ofDays(365))
val cert = X509V3CertificateGenerator()
cert.setSerialNumber(BigInteger.valueOf(1))
cert.setSubjectDN(X509Principal("CN=localhost"))
cert.setIssuerDN(X509Principal("CN=localhost"))
cert.setPublicKey(kp.public)
cert.setNotBefore(Date.from(start))
cert.setNotAfter(Date.from(stop))
cert.setSignatureAlgorithm("SHA1WithRSAEncryption")
val signingKey: PrivateKey = kp.private
return cert.generate(signingKey, "BC")
}
fun createServerSslContext(): SslContext {
val kpg = KeyPairGenerator.getInstance("RSA")
kpg.initialize(4096)
val kp = kpg.generateKeyPair()
val ssc = generateCertificate(kp)
return SslContextBuilder.forServer(kp.private, ssc)
.sslProvider(SslProvider.JDK)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(
ApplicationProtocolConfig(
Protocol.ALPN,
SelectorFailureBehavior.NO_ADVERTISE,
SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2,
),
)
.build()
}
fun getServerAPNHandler(channelCounter: AtomicInteger): ApplicationProtocolNegotiationHandler {
return object : ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_2) {
override fun configurePipeline(ctx: ChannelHandlerContext, protocol: String) {
if (ApplicationProtocolNames.HTTP_2 == protocol) {
ctx.pipeline()
.addLast(
Http2FrameCodecBuilder.forServer().build(),
Http2ServerResponseHandler(channelCounter),
)
return
}
throw IllegalStateException("Protocol [$protocol] not supported")
}
}
}
}
@Sharable
class Http2ServerResponseHandler(
val channelCounter: AtomicInteger
) : ChannelDuplexHandler() {
@Throws(Exception::class)
override fun channelRead(ctx: ChannelHandlerContext, msg: Any) {
if (msg is Http2HeadersFrame) {
if (!msg.isEndStream) {
val content = ctx.alloc()
.buffer()
try {
content.writeBytes(RESPONSE_BYTES.duplicate())
val headers = DefaultHttp2Headers().status(HttpResponseStatus.OK.codeAsText())
ctx.write(DefaultHttp2HeadersFrame(headers).stream(msg.stream()))
ctx.writeAndFlush(DefaultHttp2DataFrame(content, true).stream(msg.stream()))
} finally {
content.clear()
}
}
} else {
super.channelRead(ctx, msg)
}
}
@Throws(Exception::class)
override fun channelReadComplete(ctx: ChannelHandlerContext) {
ctx.flush()
}
override fun disconnect(ctx: ChannelHandlerContext?, promise: ChannelPromise?) {
logger.info("Established connections count = [${channelCounter.decrementAndGet()}")
super.disconnect(ctx, promise)
}
override fun channelUnregistered(ctx: ChannelHandlerContext?) {
println("Established connections count = [${channelCounter.decrementAndGet()}]")
super.channelUnregistered(ctx)
}
companion object: KLogging() {
val RESPONSE_BYTES: ByteBuf = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Hello World", CharsetUtil.UTF_8))
}
}