Implementing asynchronous processing with OSGi R7 Promises

in java •  6 years ago  (edited)

https://ideas.into.software/engineering/2019/04/28/osgi-r7-promises-asynchronous-processing.html

One of the 12+ implementations of OSGi R7 specifications utilized in the Automated Linguistic Analysis application I put together https://github.com/ideas-into-software/automated-linguistic-analysis is OSGi R7 Promises. You can read the spec[1], but examples of actual applications utilizing this technology are literally non-existent in public repositories.

In my previous article “Using Camel and RabbitMQ in an OSGi R7 application, including custom message types” I provided background information regarding technologies used in this application https://github.com/ideas-into-software/automated-linguistic-analysis. In this article, I would like to fill this hole of lack of examples, and provide an overview of how I implemented asynchronous processing with OSGi R7 Promises. You can clone, configure and deploy the application yourself–all steps are documented https://github.com/ideas-into-software/automated-linguistic-analysis.

For this particular use case–i.e. long running speech to text operations utilizing IBM Watson Speech to Text service–having such communication happen in a non-blocking way was obviously a necessity, so e.g. someone analyzing famous speech of Charlie Chaplin[2] would not prevent someone else from performing analysis of a few seconds long speech and, instead, this could be executed in parallel. I had the choice of using Java 8 native abstractions for asynchronous programming, but this seemed like a good time to apply OSGi R7's native abstractions, i.e. Promises.

1. The service-transcription-impl module https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/service-transcription-impl contains all the code mentioned in this article. All you will need in addition is your own IBM Watson account and an instance of their Speech to Text service–luckily, IBM provides free accounts[3] for small footprint instances. Having these, you will need to configure the API key and the endpoint address in one of the OSGi R7 Configurator[4] configuration.json files–i.e. in the mono-app module configuration.json https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/mono-app–if using the monolith version of this application–or in the k8-transcription-app module if using the cluster version of this application https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/k8-transcription-app.

2. Aside from the required IBM Watson SDK and Speech to Text dependencies, which you will find in the POM file of the service-transcription-impl module https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/service-transcription-impl, there are no other dependencies required–OSGi enroute dependencies pull these in with no extra work on our part, including Promises.

(…) 
<dependency>
    <groupId>com.ibm.watson.developer_cloud</groupId>
    <artifactId>core</artifactId>
    <version>${ibmwatson.version}</version>
</dependency>
<dependency>
    <groupId>com.ibm.watson.developer_cloud</groupId>
    <artifactId>speech-to-text</artifactId>
    <version>${ibmwatson.version}</version>
</dependency>
(…) 

3. In the same service-transcription-impl module https://github.com/ideas-into-software/automated-linguistic-analysis/tree/master/service-transcription-impl you will find the software.into.ala.service.transcription.impl.TranscriptionServiceImpl class which contains all of the business logic. Here, we start by implementing the private class TranscriptionWorker, which wraps the long running operation as a Runnable, and sets the results–or error, in case of failure–on the Deferred instance passed in the constructor, i.e.:

try (InputStream fileIs = Files.newInputStream(filePath)) {
  RecognizeOptions recognizeOptions = new RecognizeOptions.Builder()
   .audio(fileIs)
   .contentType(fileFormat.getMime()).timestamps(false).build();
  SpeechRecognitionResults results = speechToText.recognize(recognizeOptions).execute();
  deferred.resolve(results);
} catch (Throwable t) {
  deferred.fail(t);
} 

4. Then, for each transcription request, we start a new instance of TranscriptionWorker, passing an instance of Deferred which will contain result of transcription or error in case of failure, i.e.:

private Promise<SpeechRecognitionResults> transcribeAsync(String fileId) {
  final Deferred<SpeechRecognitionResults> deferred = new Deferred<SpeechRecognitionResults>();
  this.transcriptionsExec.execute(new TranscriptionWorker(this.configuration, this.iAmOptions, fileId, deferred));
  return deferred.getPromise();
}

5. For processing results obtained and passed via Deferred, we implement a callback method for success scenario in software.into.ala.service.transcription.impl.TranscriptionServiceImpl.processResult(String, SpeechRecognitionResults)

6. Failures do happen as well, and for this we implement another callback method for failure scenario in software.into.ala.service.transcription.impl.TranscriptionServiceImpl.handleFailure(String, Throwable)

7. We then delegate each incoming transcription request to our asynchronous processing method, attaching our success and failure callback methods in software.into.ala.service.transcription.impl.TranscriptionServiceImpl.transcribe(FileMessageDTO), i.e.:

(…) 
Promise<SpeechRecognitionResults> results = transcribeAsync(message.fileId);
results.onSuccess(r -> processResult(message.fileId, r));
results.onFailure(r -> handleFailure(message.fileId, r));
(…)

[1]: “OSGi Enterprise R7 Promises Specificationhttps://osgi.org/specification/osgi.enterprise/7.0.0/util.promise.html

[2]: “Charlie Chaplin - Famous speechhttps://soundcloud.com/omar_abdel_aziz1/charlie-chaplin-famous-speech-adolf-hitlers-st

[3]: “IBM Watson Speech to Texthttps://www.ibm.com/watson/services/speech-to-text/

[4]: “OSGi Compendium R7 Configurator Specificationhttps://osgi.org/specification/osgi.cmpn/7.0.0/service.configurator.html

Authors get paid when people like you upvote their post.
If you enjoyed what you read here, create your account today and start earning FREE STEEM!