Using Camel and RabbitMQ in an OSGi R7 application, including custom message types

in java •  6 years ago  (edited)

https://ideas.into.software/engineering/2019/04/28/osgi-r7-camel-rabbitmq-custom-message-types.html

One of the many good practices OSGi [1] frameworks impose is properly modularizing your code. Having your app modularized, you have quite a few choices when it comes to communication between those modules. You can build your applications as “MonolithFirst” [2] , but you can decide up-front to choose a communication style which will not hinder breaking them down into separate containers so e.g. they can be deployed in a cluster.

With OSGi, of course, a direct communication style such as declarative services[3] could be your first choice. But what if some of the operations are long running and therefore an asynchronous communication style would be preferable? Additionally, what if you foresee that in the not so distant future you would like to have the option to separate some of the modules responsible for those long running operations into separate containers and deploy one or more instances of each in Kubernetes[4] cluster?

For those reasons, I chose to use a message broker to facilitate this asynchronous communication and have the flexibility to migrate the application into cluster whenever I choose to do so. Because of my previous experience with and its versatility, I chose RabbitMQ[5]. In addition, to have the option of using some of the more popular Enterprise Integration Patterns[6], without having to implement them from scratch, I chose Camel[7], which, luckily has a RabbitMQ component[8].

Since I found very little to no documentation regarding integration of all these pieces together in an OSGi R7 framework, it's one of the several things I'd like to share having gathered this material when putting together the Automated Linguistic Analysis application https://github.com/ideas-into-software/automated-linguistic-analysis. It is a complete application showcasing usage of technologies such as:

  • OSGi R7 Promises for asynchronous generation of transcriptions and linguistic analyses
  • OSGi R7 Push Stream and JAX RS Server Sent Events for push notifications of processing status
  • Apache Camel 2.23.1 and RabbitMQ 3.7 for asynchronous communication between services
  • JPA 2.1 and Hibernate 5.2.12, along with OSGi R7 JPA and Transaction Control services, for persistence layer
  • OSGi R7 HTTP and JAX RS Whiteboard for registering servlets, resources and REST controllers
  • OSGi R7 Configurator, Configuration Admin and Metatype services for automatic configuration of components
  • OSGi R7 Declarative Services for dependency injection
  • Maven automated build of Docker images
  • Maven automated deployment into Kubernetes cluster
  • RabbitMQ message broker as a StatefulSet
  • CockroachDB relational database as a StatefulSet

You can clone, configure and deploy the application yourself–all steps are documented https://github.com/ideas-into-software/automated-linguistic-analysis.

Here, I will not cover basics of neither OSGi, Bnd[9]/Bndtools[10], Camel nor RabbitMQ–you can explore sites like OSGi enRoute[11], where you can find plenty of introductory material, as well as DZone[12] or blogs like Vogella[13]–and only focus on what's most important to have these work together for this particular example, including using custom message types.

1. Since this is an OSGi application which uses Maven[14] with Bnd plugins, we start by adding required dependencies; hence, in service-messaging-impl module https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/service-messaging-impl you will find camel-scr and camel-rabbitmq dependencies, i.e.:

(...)
 <dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-scr</artifactId>
   <version>2.23.1</version>
 </dependency>
 <dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-rabbitmq</artifactId>
   <version>2.23.1</version>
</dependency> 
(...)

2. In the same service-messaging-impl module we add class which will bootstrap OSGi implementation of Camel Context (OsgiDefaultCamelContext), i.e. software.into.ala.service.messaging.impl.MessagingBootstrapService extending org.apache.camel.scr.AbstractCamelRunner provided by the camel-scr dependency.

3. We switch now to the mono-app module https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/mono-app which pulls together all the components required for our Monolith application, and in the mono-app.bndrun run descriptor file we reference the service-messaging-impl module via runrequires instruction, so that all of the dependencies it brings can be resolved properly, i.e.

-runrequires: \ 
  osgi.identity;filter:='(osgi.identity=software.into.ala.service-messaging-impl)',\
(…)

4. In the same mono-app module, in both mono-app.bndrun and debug.bndrun run descriptor files via runproperties instruction we add classes which Camel uses, i.e.:

-runproperties: \
  org.osgi.framework.bootdelegation=sun.*,com.sun.*

Without these, as soon as you’d launch the app, you’d hit this error:

(…) Exception in thread "Camel Thread #0 - LRUCacheFactory" java.lang.NoClassDefFoundError: Could not initialize class org.apache.camel.com.github.benmanes.caffeine.cache.BoundedLocalCache
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at org.apache.camel.com.github.benmanes.caffeine.cache.LocalCacheFactory.newBoundedLocalCache(Unknown Source) (…) 

5. Now, for the last part, since we would like to use custom message types for our asynchronous communication via RabbitMQ message broker, in the service-messaging-dto module https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/service-messaging-dto, which contains classes serving as custom message types, in POM of this module we attach it module as fragment to the org.apache.camel.camel-rabbitmq bundle brought in by the camel-rabbitmq dependency, i.e.:

<plugin>
 <groupId>biz.aQute.bnd</groupId>
 <artifactId>bnd-maven-plugin</artifactId>
 <configuration>
  <bnd><![CDATA[
    Fragment-Host: org.apache.camel.camel-rabbitmq
    Bundle-SymbolicName: ${project.groupId}.${project.artifactId}
     -sources: true
     -contract: *
    ]]></bnd>
 </configuration>
</plugin>

This is required so these custom message types can be found by the camel-rabbitmq dependency – without this, as soon as modules started communicating with each other, you’d hit the following error:

(…) RabbitMQConsumer Error processing exchange. Exchange[]. Caused by: [java.lang.ClassNotFoundException - software.into.ala.service.messaging.dto.FileMessageDTO cannot be found by org.apache.camel.camel-rabbitmq_2.23.1]
java.lang.ClassNotFoundException: software.into.ala.service.messaging.dto.FileMessageDTO cannot be found by org.apache.camel.camel-rabbitmq_2.23.1
    at org.eclipse.osgi.internal.loader.BundleLoader.findClassInternal(BundleLoader.java:511)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:422)
    at org.eclipse.osgi.internal.loader.BundleLoader.findClass(BundleLoader.java:414)
    at org.eclipse.osgi.internal.loader.ModuleClassLoader.loadClass(ModuleClassLoader.java:153)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:677)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
    at org.apache.camel.component.rabbitmq.RabbitMQMessageConverter.deserializeBody(RabbitMQMessageConverter.java:287)
    at org.apache.camel.component.rabbitmq.RabbitMQMessageConverter.populateMessageBody(RabbitMQMessageConverter.java:276)
    at org.apache.camel.component.rabbitmq.RabbitMQMessageConverter.populateRabbitExchange(RabbitMQMessageConverter.java:226)
    at org.apache.camel.component.rabbitmq.RabbitMQEndpoint.createRabbitExchange(RabbitMQEndpoint.java:204)
    at org.apache.camel.component.rabbitmq.RabbitConsumer.doHandleDelivery(RabbitConsumer.java:92)
    at org.apache.camel.component.rabbitmq.RabbitConsumer.handleDelivery(RabbitConsumer.java:79)
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748) (…)

[1]: “OSGihttps://www.osgi.org/

[2]: “MonolithFirsthttps://martinfowler.com/bliki/MonolithFirst.html

[3]: “OSGi Compendium R7 Declarative Services Specificationhttps://osgi.org/specification/osgi.cmpn/7.0.0/service.component.html

[4]: “Kuberneteshttps://kubernetes.io/

[5]: “RabbitMQhttps://www.rabbitmq.com/

[6]: “Enterprise Integration Patternshttps://www.enterpriseintegrationpatterns.com/

[7]: “Apache Camelhttp://camel.apache.org/

[8]: “RabbitMQ Componenthttp://camel.apache.org/rabbitmq.html

[9]: “bndhttps://bnd.bndtools.org/

[10]: “Bndtoolshttps://bndtools.org/

[11]: “OSGi enRoutehttps://enroute.osgi.org/

[12]: “DZonehttps://dzone.com/

[13]: “Vogella Bloghttp://blog.vogella.com/category/osgi/

[14]: “Mavenhttps://maven.apache.org/

Authors get paid when people like you upvote their post.
If you enjoyed what you read here, create your account today and start earning FREE STEEM!