java.lang.IllegalStateException: SRMSG00028: The subscription to ... has been cancelled
at io.smallrye.reactive.messaging.extension.AbstractEmitter.verify(AbstractEmitter.java:165)
at io.smallrye.reactive.messaging.extension.AbstractEmitter.emit(AbstractEmitter.java:144)
at io.smallrye.reactive.messaging.extension.EmitterImpl.send(EmitterImpl.java:29)
at airhacks.kafka.jaxrs.boundary.MessagesResource.send(MessagesResource.java:34)
at airhacks.kafka.jaxrs.boundary.MessagesResource_Subclass.send$$superaccessor1(MessagesResource_Subclass.zig:209)
at airhacks.kafka.jaxrs.boundary.MessagesResource_Subclass$$function$$3.apply(MessagesResource_Subclass$$function$$3.zig:33)
is caused by an injected Emitter
into a RequestScoped managed bean:
import javax.annotation.PreDestroy;
import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.context.RequestScoped;
import javax.inject.Inject;
import javax.ws.rs.*;
import org.eclipse.microprofile.reactive.messaging.Channel;
import org.eclipse.microprofile.reactive.messaging.Emitter;
@RequestScoped
@Path("messages")
@Consumes(MediaType.TEXT_PLAIN)
public class MessagesResource {
@Inject
@Channel("messages")
Emitter<String> messageEmitter;
@POST
public void send(String message) {
this.messageEmitter.send(message);
}
@PreDestroy
public void closeStream(){
this.messageEmitter.complete();
}
}
Changing the scope to ApplicationScoped solves the problem:
@ApplicationScoped
@Path("messages")
@Consumes(MediaType.TEXT_PLAIN)
public class MessagesResource{
//...
}
Example was tested with: quarkus.io and configured SmallRye Apache Kafka connector (mp.messaging.outgoing.messages.connector=smallrye-kafka).
Quarkus started in development (quarkus:dev) mode with a JAX-RS endpoint exposed, opens two ports: (http) 8080 and (debug) 5005 per default.
You will have to assign distinct ports for every new launched instance.
The http port is configurable in the application.properties: quarkus.http.port=8282 or environment entries
(not suitable for our purpose), the debug port can be changed (-Ddebug=6000)
or deactivated from the command line only: mvn compile -Ddebug=false quarkus:dev
Update from Java 14 with --enable-preview to Java 15, fixing Apache CXFs:
java.lang.IllegalAccessError: class org.apache.cxf.microprofile.client.MicroProfileClientProviderFactory$$Lambda$345/0x0000000800c309b0 tried to access protected method 'int org.apache.cxf.jaxrs.provider.ProviderFactory.compareCustomStatus(org.apache.cxf.jaxrs.model.ProviderInfo, org.apache.cxf.jaxrs.model.ProviderInfo)' (org.apache.cxf.microprofile.client.MicroProfileClientProviderFactory$$Lambda$345/0x0000000800c309b0 and org.apache.cxf.jaxrs.provider.ProviderFactory are in unnamed module of loader 'app'
creating Java Command Line Applications with and without jbang,
new ways to launch and build Java applications with minimal requirements,
how jbang happened, combining quarkus and jbang, and building serverless applications with quarkus and jbang