Skip to main content

Creating a custom Inbound Channel for a non-event based system

Target audience: Developers

Introduction

The Flowable event registry works seamlessly with your message queue/event bus systems like Kafka, RabbitMQ, JMS, and Amazon SQS. In some projects however, there is no support for the particular message queue or event bus system (yet). Sometimes there even is no real 'event' possible.

Using events in BPMN and especially CMMN models is extremely powerful, so sometimes you want to wrap a legacy, non-event-driven system into a source of events. The Flowable event registry pipeline concept is very flexible and allows to do exactly that.

This how-to describes how you can write your own event integration and build an event driven process or case, without having events from a third party system.

Creating a Channel Model

When you create a new channel model you have the option to decide, whether it's an inbound or an outbound channel. The inbound channel model itself then gives you the choice of your event registry implementation. In case your implementation is not supported you can select Custom as well. The option Custom allows you to provide your own implementation. Therefore, you need to create a bean implementing the InboundEventChannelAdapter. The remaining configuration you can do as you would normally do it:

Channel Configuration

Implementing the Channel Model

The InboundEventChannelAdapter is a simple interface which contains just two setter methods. Those setters are used once we deploy our channel to propagate the InboundChannelModel and the EventRegistry. We can save both as fields and then use them once we are receiving an actual event:

import org.flowable.eventregistry.api.EventRegistry;
import org.flowable.eventregistry.api.InboundEventChannelAdapter;
import org.flowable.eventregistry.model.InboundChannelModel;
import org.springframework.stereotype.Service;

@Service
public class PollingInboundEventChannelAdapter implements InboundEventChannelAdapter {

private InboundChannelModel inboundChannelModel;
private EventRegistry eventRegistry;

@Override
public void setInboundChannelModel(InboundChannelModel inboundChannelModel) {
this.inboundChannelModel = inboundChannelModel;
}

@Override
public void setEventRegistry(EventRegistry eventRegistry) {
this.eventRegistry = eventRegistry;
}

}

Now we can enhance this class and use the eventRegistry to signal that we have an eventReceived whenever we have a new event. This can be done either on an event by event basis from incoming events, or inside a loop which is generating events based on a polling result (e.g. for a legacy system):

@Service
public class PollingInboundEventChannelAdapter implements InboundEventChannelAdapter {

private static final Logger LOGGER = LoggerFactory.getLogger(PollingInboundEventChannelAdapter.class);

private final ObjectMapper objectMapper;
private InboundChannelModel inboundChannelModel;
private EventRegistry eventRegistry;

public PollingInboundEventChannelAdapter(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

@Override
public void setInboundChannelModel(InboundChannelModel inboundChannelModel) {
this.inboundChannelModel = inboundChannelModel;
}

@Override
public void setEventRegistry(EventRegistry eventRegistry) {
this.eventRegistry = eventRegistry;
}

@Scheduled(cron = "0 */15 * * * MON-FRI")
public void poll() {
List<JsonNode> events = executePollingAndReturnChangeSet();
for (JsonNode event : events) {
try {
String eventContent = this.objectMapper.writeValueAsString(event);
this.eventRegistry.eventReceived(inboundChannelModel, eventContent);
} catch (JsonProcessingException e) {
LOGGER.error("Failed to convert event {}", event, e);
}
}
}

protected List<JsonNode> executePollingAndReturnChangeSet() {
// implement polling logic
}

}

From an architectural point view you now only need to implement a caching mechanism to ensure that events are only triggered once per cron execution.

This pattern can then later on be replaced with an event bus, in case the architecture evolves. In addition, in case the event format stays the same, the model might not even need to be changed, you are just deploying a new inbound channel which will then listen on a message queue.

Of course, instead of sending multiple events at once and doing it based on a cron expression, you can also have your custom listener for events from an external source here.

Conclusion

The event registry allows for powerful integrations to integrate with all different kinds of systems.

Even if you don't have a message system yet, you are still able to use the event registry to trigger events from an external source or write a custom integration. For a complete event registry architecture overview please refer to the event registry documentation.