Quarkus, Kafka, java.lang.IllegalStateException: SRMSG00028 and the Solution

The following exception:


java.lang.IllegalStateException: SRMSG00028: The subscription to ... has been cancelled
at io.smallrye.reactive.messaging.extension.AbstractEmitter.verify(AbstractEmitter.java:165)
at io.smallrye.reactive.messaging.extension.AbstractEmitter.emit(AbstractEmitter.java:144)
at io.smallrye.reactive.messaging.extension.EmitterImpl.send(EmitterImpl.java:29)
at airhacks.kafka.jaxrs.boundary.MessagesResource.send(MessagesResource.java:34)
at airhacks.kafka.jaxrs.boundary.MessagesResource_Subclass.send$$superaccessor1(MessagesResource_Subclass.zig:209)
at airhacks.kafka.jaxrs.boundary.MessagesResource_Subclass$$function$$3.apply(MessagesResource_Subclass$$function$$3.zig:33)    

is caused by an injected Emitter into a RequestScoped managed bean:


import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.ws.rs.*;

import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;

@RequestScoped
@Path("messages")
@Consumes(MediaType.TEXT_PLAIN)
public class MessagesResource {

    @Inject
    @Channel("messages")
    Emitter<String> messageEmitter;
    
    @POST
    public void send(String message) {
        this.messageEmitter.send(message);
    }

    @PreDestroy
    public void closeStream(){
        this.messageEmitter.complete();
    }
}

Changing the scope to ApplicationScoped solves the problem:


@ApplicationScoped
@Path("messages")
@Consumes(MediaType.TEXT_PLAIN)
public class MessagesResource{
    //...
}

Example was tested with: quarkus.io and configured SmallRye Apache Kafka connector (mp.messaging.outgoing.messages.connector=smallrye-kafka).

Comments:

Post a Comment:
  • HTML Syntax: NOT allowed
...the last 150 posts
...the last 10 comments
License