Integration refers to the process of combining software parts (or subsystems) into one system. An integration framework is a lightweight utility that provides libraries and standardized methods to coordinate messaging among different technologies. As software connects the world in increasingly more complex ways, integration makes it all possible facilitating app-to-app communication. Learn more about this necessity for modern software development by keeping a pulse on the industry topics such as integrated development environments, API best practices, service-oriented architecture, enterprise service buses, communication architectures, integration testing, and more.
Backend For Frontend (BFF) Pattern
AI-Driven Microservice Automation
In today's data-driven world, the quest for efficient and flexible database solutions is an ongoing pursuit for developers and businesses alike. One such solution is HarperDB. HarperDB is a modern and versatile database management system with simplicity, speed, and scalability. In this article, we will delve into the world of HarperDB, exploring why it has gained popularity and what makes it a compelling choice for developers and organizations. Additionally, we will take our first steps towards integrating HarperDB with the Java programming language. Java is a widely adopted, robust, and platform-independent programming language known for its reliability in building diverse applications. By bridging the gap between HarperDB and Java, we will unlock many possibilities for managing and accessing data seamlessly. So, join us on this journey as we unravel HarperDB and embark on our first integration with plain Java. Discover how this combination can empower you to build efficient and responsive applications, streamline data management, and take your development projects to the next level. HarperDB: A Modern Database Solution HarperDB blends the simplicity of traditional functionality with the power and flexibility required by modern applications. Essentially, HarperDB is a globally distributed edge application platform comprised of an edge database, streaming broker, and user-defined applications, with near-zero latency, huge cost savings, and a superior developer experience. This versatility makes it an option for businesses and developers grappling with the complexities of managing diverse data sources. HarperDB can run anywhere from edge to cloud, with a user-friendly management interface that enables developers of any skill level to get up and running quickly. Unlike many traditional databases that require extensive setup, configuration, and database administration expertise, HarperDB streamlines these processes. This simplicity reduces the learning curve and saves valuable development time, allowing teams to focus on building applications rather than managing the database. Performance is critical to any database system, especially in today's real-time and data-intensive applications. HarperDB's architecture is designed for speed and scale, ensuring that data retrieval and processing happens at lightning speed. HarperDB offers horizontal scalability, allowing you to add resources seamlessly as your data grows. HarperDB goes beyond pigeonholing data into predefined structures. This flexibility is precious in today's data landscape, where information comes in diverse formats. With HarperDB, you can store, query, and analyze data in a way that aligns with your application's unique requirements without being constrained by rigid schemas. HarperDB enables cost savings in numerous ways. The ease of use and low maintenance requirements translate into reduced operational expenses. Additionally, HarperDB delivers the same throughput as existing solutions with less hardware (or enables you to use the same amount of hardware and have greater throughput). As we delve deeper into HarperDB's integration with Java, we will unlock the potential of this database system and explore how it can elevate your data projects to new heights. Installing HarperDB Locally In our exploration of HarperDB and its integration with Java, one of the first steps is to install HarperDB locally. While a cloud version is available, this article focuses on the local installation to provide you with hands-on experience. You can choose your preferred flavor and installation method from the official documentation here. However, for simplicity, we’ll demonstrate how to set up HarperDB using Docker, a popular containerization platform. Docker Installation Docker simplifies the process of installing and running HarperDB in a containerized environment. Please note that the following Docker command is for demonstration purposes and should not be used in production. In production, you should follow best practices for securing your database credentials. Here’s how to run HarperDB in a Docker container with a simple username and password: Shell docker run -d \ -e HDB_ADMIN_USERNAME=root \ -e HDB_ADMIN_PASSWORD=password \ -e HTTP_THREADS=4 \ -p 9925:9925 \ -p 9926:9926 \ harperdb/harperdb Let’s break down what this command does: -d: Runs the container in detached mode (in the background) -e HDB_ADMIN_USERNAME=root: Sets the admin username to root (you can change this) -e HDB_ADMIN_PASSWORD=password: Sets the admin password to password (remember to use a robust and secure password in production) -e HTTP_THREADS=4: Configures the number of HTTP threads for handling requests -p 9925:9925 and -p 9926:9926: Maps the container’s internal ports 9925 and 9926 to the corresponding ports on your host machine This local installation will serve as the foundation for exploring HarperDB’s capabilities and its integration with Java. In subsequent sections, we will dive deeper into using HarperDB and connecting it with Java to leverage its features for building robust and data-driven applications. Creating Schema, Table, and Fields in HarperDB Now that we have HarperDB running locally, let’s create a schema and table and define the fields for our “dev” schema and “person” table. We’ll perform these operations using HTTP requests. Please note that the authorization header in these requests uses a primary authentication method with the username “root” and password “password”. In a production environment, always ensure secure authentication methods. To start working with HarperDB locally, we must create a schema, define a table, and specify its fields. These operations can be performed through HTTP requests. In our example, we’ll create a dev schema and a "person" table with "id", "name", and "age" columns. We’ll use curl commands for this purpose. Before running these commands, ensure that your HarperDB Docker container is up and running, as explained earlier. Creating a Schema (‘dev’): Shell curl --location --request POST 'http://localhost:9925/' \ --header 'Authorization: Basic cm9vdDpwYXNzd29yZA==' \ --header 'Content-Type: application/json' \ --data-raw '{ "operation": "create_schema", "schema": "dev" }' This command sends an HTTP POST request to create a dev schema. The authorization header includes the basic authentication credentials (Base64 encoded username and password). Replace cm9vdDpwYXNzd29yZA== with your base64-encoded credentials. Creating a "person" Table With "id" as the Hash Attribute: Shell curl --location 'http://localhost:9925' \ --header 'Authorization: Basic cm9vdDpwYXNzd29yZA==' \ --header 'Content-Type: application/json' \ --data '{ "operation": "create_table", "schema": "dev", "table": "person", "hash_attribute": "id" }' This command creates a "person" table within the "dev" schema and designates the "id" column as the hash attribute. The "hash_attribute" is used for distributed data storage and retrieval. Creating "name" and "age" Columns in the "person" Table: Shell curl --location 'http://localhost:9925' \ --header 'Authorization: Basic cm9vdDpwYXNzd29yZA==' \ --header 'Content-Type: application/json' \ --data '{ "operation": "create_attribute", "schema": "dev", "table": "person", "attribute": "name" }' curl --location 'http://localhost:9925' \ --header 'Authorization: Basic cm9vdDpwYXNzd29yZA==' \ --header 'Content-Type: application/json' \ --data '{ "operation": "create_attribute", "schema": "dev", "table": "person", "attribute": "age" }' These two commands create "name" and "age" columns within the "person" table. These columns define the structure of your data. With these HTTP requests, you’ve set up the schema, table, and columns in your local HarperDB instance. You are now ready to start working with data and exploring how to integrate HarperDB with Java for powerful data-driven applications. Exploring the Java Code for HarperDB Integration This session will explore the Java code to integrate HarperDB into a plain Java SE (Standard Edition) application. We will create a simple “Person” entity with “id”, “name”, and “age” fields. We must set up a Maven project and include the HarperDB JDBC driver to start. Step 1: Create a Maven Project Begin by creating a new Maven project using the Maven Quickstart Archetype. You can use the following command to create the project: Shell mvn archetype:generate -DgroupId=com.example -DartifactId=harperdb-demo -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false This command will generate a basic Maven project structure. Navigate to the project root directory. Step 2: Include the HarperDB JDBC Driver Download the HarperDB JDBC driver from the official HarperDB resources page: HarperDB Drivers. Extract the contents of the downloaded ZIP file. Create a new folder named lib in your project root directory. Copy the HarperDB JDBC driver JAR file from the extracted contents and paste it into the lib folder. Step 3: Update the Maven POM File Open the pom.xml file in your project. Add the following Maven dependency to include the HarperDB JDBC driver. Make sure to adjust the <version> and <systemPath> to match your JAR file: XML <dependency> <groupId>cdata.jdbc.harperdb</groupId> <artifactId>cdata.jdbc.harperdb</artifactId> <scope>system</scope> <version>1.0</version> <systemPath>${project.basedir}/lib/cdata.jdbc.harperdb.jar</systemPath> </dependency> This dependency instructs Maven to include the HarperDB JDBC driver JAR file as a system dependency for your project. Creating a Person Record and a PersonDAO Class for HarperDB Integration We will create a Person record, an immutable class introduced in Java for data modeling. We will also implement a PersonDAO class to interact with HarperDB using direct JDBC API calls. 1. Create the Person Record First, we define the Person record with three attributes: id, name, and age. We also provide a static factory method of for creating Person instances. This record simplifies data modeling and reduces code by automatically generating a constructor, accessor methods, and equals() and hashCode() implementations. Java public record Person(String id, String name, Integer age) { public static Person of(String name, Integer age) { return new Person(null, name, age); } } 2. Create the PersonDAO Class Next, we create the PersonDAO class, responsible for database operations using the HarperDB JDBC driver. This class provides methods for inserting, finding by ID, deleting, and retrieving all Person records from the database. Java import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.Properties; public class PersonDAO { private static final String INSERT = "INSERT INTO dev.person (name, age) VALUES (?, ?)"; private static final String SELECT = "select * From dev.person"; private static final String FIND_ID = "select * From dev.person where id = ?"; private static final String DELETE = "delete From dev.person where id = ?"; public void insert(Person person) throws SQLException { try(Connection connection = createConnection()){ var statement = connection.prepareStatement(INSERT); statement.setString(1, person.name()); statement.setInt(2, person.age()); statement.execute(); } } public Optional<Person> findById(String id) throws SQLException { try(Connection connection = createConnection()) { var statement = connection.prepareStatement(FIND_ID); statement.setString(1, id); var resultSet = statement.executeQuery(); if(resultSet.next()) { var name = resultSet.getString("name"); var age = resultSet.getInt("age"); return Optional.of(new Person(id, name, age)); } return Optional.empty(); } } public void delete(String id) throws SQLException { try(Connection connection = createConnection()) { var statement = connection.prepareStatement(DELETE); statement.setString(1, id); statement.execute(); } } public List<Person> findAll() throws SQLException { List<Person> people = new ArrayList<>(); try(Connection connection = createConnection()) { var statement = connection.prepareStatement(SELECT); var resultSet = statement.executeQuery(); while (resultSet.next()) { var id = resultSet.getString("id"); var name = resultSet.getString("name"); var age = resultSet.getInt("age"); people.add(new Person(id, name, age)); } } return people; } static Connection createConnection() throws SQLException { var properties = new Properties(); properties.setProperty("Server","http://localhost:9925/"); properties.setProperty("User","root"); properties.setProperty("Password","password"); return DriverManager.getConnection("jdbc:harperdb:", properties); } } With the Person record and PersonDAO class in place, you can now interact with HarperDB using Java, performing operations such as inserting, finding by ID, deleting, and retrieving Person records from the database. Adjust the database connection properties in the createConnection method to match your HarperDB setup. Executing the Java Application with HarperDB Integration With your Person record and PersonDAO class in place, you can execute the Java application to interact with HarperDB. Here’s your App class for implementing the application: Java import java.sql.SQLException; import java.util.List; public class App { public static void main(String[] args) throws SQLException { PersonDAO dao = new PersonDAO(); dao.insert(Person.of( "Ada", 10)); dao.insert(Person.of("Poliana", 20)); dao.insert(Person.of("Jhon", 30)); List<Person> people = dao.findAll(); people.forEach(System.out::println); System.out.println("Find by id: "); var id = people.get(0).id(); dao.findById(id).ifPresent(System.out::println); dao.delete(id); System.out.println("After delete: is present? " + dao.findById(id).isPresent()); } private App() { } } In this App class: We create an instance of the PersonDAO class to interact with the database. We insert sample Person records using the dao.insert(...) method. We retrieve all Person records using dao.findAll() and print them. We find a Person by ID and print it using dao.findById(...). We delete a Person by ID using dao.delete(...) and then check if it’s still in the database. Executing this App class will perform these operations against your HarperDB database, demonstrating how your Java application can interact with HarperDB using the Person record and PersonDAO class for database operations. Make sure to have HarperDB running and the HarperDB JDBC driver adequately configured in your project, as mentioned earlier in the article. Conclusion In our journey to explore HarperDB and its integration with Java, we’ve discovered a versatile and modern database solution that combines simplicity, speed, and flexibility to meet a wide range of data management needs. In our conclusion, we recap what we’ve learned and highlight the resources available for further exploration. Next Steps Documentation: For a deeper dive into HarperDB’s features and capabilities, consult the official documentation at HarperDB Documentation (linked earlier in this article). Sample Code: Explore practical examples and sample code for integrating HarperDB with Java in the HarperDB Samples GitHub Repository. Incorporating HarperDB into your Java applications empowers you to manage data efficiently, make informed decisions in real time, and build robust, data-driven solutions. Whether you’re developing IoT applications, web and mobile apps, or a global gaming solution, HarperDB is a modern and accessible choice.
In enterprises, SREs, DevOps, and cloud architects often discuss which platform to choose for observability for faster troubleshooting of issues and understanding about performance of their production systems. There are certain questions they need to answer to get maximum value for their team, such as: Will an observability tool support all kinds of workloads and heterogeneous systems? Will the tool support all kinds of data aggregation, such as logs, metrics, traces, topology, etc..? Will the investment in the (ongoing or new) observability tool be justified? In this article, we will provide the best way to get started with unified observability of your entire infrastructure using open-source Skywalking and Istio service mesh. Istio Service Mesh of Multi-Cloud Application Let us take an example of a multi-cloud example where there are multiple services hosted on on-prem or managed Kubernetes clusters. The first step for unified observability will be to form a service mesh using Istio service mesh. The idea is that all the services or workloads in Kubernetes clusters (or VMs) should be accompanied by an Envoy proxy to abstract the security and networking out of business logic. As you can see in the below image, a service mesh is formed, and the network communication between edge to workloads, among workloads, and between clusters is controlled by the Istio control plane. In this case, the Istio service mesh emits a logs, metrics, and traces for each Envoy proxies, which will help to get unified observability. We need a visualization tool like Skywalking to collect the data and populate it for granular observability. Why Skywalking for Observability SREs from large companies such as Alibaba, Lenovo, ABInBev, and Baidu use Apache Skywalking, and the common reasons are: Skywalking aggregates logs, metrics, traces, and topology. It natively supports popular service mesh software like Istio. While other tools may not support getting data from Envoy sidecars, Skywalking supports sidecar integration. It supports OpenTelemetry (OTel) standards for observability. These days, OTel standards and instrumentation are popular for MTL (metrics, logs, traces). Skywalking supports observability-data collection from almost all the elements of the full stack- database, OS, network, storage, and other infrastructure. It is open-source and free (with an affordable enterprise version). Now, let us see how to integrate Istio and Apache skywalking into your enterprise. Steps To Integrate Istio and Apache Skywalking We have created a demo to establish the connection between the Istio data plane and Skywalking, where it will collect data from Envoy sidecars and populate them in the observability dashboards. Note: By default, Skywalking comes with predefined dashboards for Apache APISIX and AWS Gateways. Since we are using Istio Gateway, it will not get a dedicated dashboard out-of-the-box, but we’ll get metrics for it in other locations. If you want to watch the video, check out my latest Istio-Skywalking configuration video. You can refer to the GitHub link here. Step 1: Add Kube-State-Metrics to Collect Metrics From the Kubernetes API Server We have installed kube-state-metrics service to listen to the Kubernetes API server and send those metrics to Apache skywalking. First, add the Prometheus community repo: Shell helm repo add prometheus-community https://prometheus-community.github.io/helm-charts (After every helm repo add, add a line about running helm repo update to fetch the latest charts.) And now you can install kube-state-metrics. Shell helm install kube-state-metrics prometheus-community/kube-state-metrics Step 2: Install Skywalking Using HELM Charts We will install Skywalking version 9.2.0 for this observability demo. You can run the following command to install Skywalking into a namespace (my namespace is skywalking). You can refer to the values.yaml. Shell helm install skywalking oci://registry-1.docker.io.apache/skywalking-helm -f -n skywalking (Optional reading) In helm chart values.yaml, you will notice that: We have made the flag oap (observability analysis platform, i.e., the back-end) and ui configuration as true. Similarly, for databases, we have enabled postgresql as true. For tracking metrics from Envoy access logs, we have configured the following environmental variables: SW_ENVOY_METRIC: default SW_ENVOY_METRIC_SERVICE: true SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS: k8s-mesh,mx-mesh,persistence SW_ENVOY_METRIC_ALS_TCP_ANALYSIS: k8s-mesh,mx-mesh,persistence This is to select the logs and metrics from the Envoy from the Istio configuration (‘c’ and ‘d’ are the rules for analyzing Envoy access logs). We will enable the OpenTelemetry receiver and configure it to receive data in otlp format. We will also enable OTel rules according to the data we will send to Skywalking. In a few moments (in Step 3), we will configure the OTel collector to scrape istiod, k8s, kube-state-metrics, and the Skywalking OAP itself. So, we have enabled the appropriate rules: SW_OTEL_RECEIVER: default SW_OTEL_RECEIVER_ENABLED_HANDLERS: “otlp” SW_OTEL_RECEIVER_ENABLED_OTEL_RULES: “istio-controlplane,k8s-cluster,k8s-node,k8s-service,oap” SW_TELEMETRY: prometheus SW_TELEMETRY_PROMETHEUS_HOST: 0.0.0.0 SW_TELEMETRY_PROMETHEUS_PORT: 1234 SW_TELEMETRY_PROMETHEUS_SSL_ENABLED: false SW_TELEMETRY_PROMETHEUS_SSL_KEY_PATH: “” SW_TELEMETRY_PROMETHEUS_SSL_CERT_CHAIN_PATH: “” We have instructed Skywalking to collect data from the Istio control plance, Kubernetes cluster, node, services, and also oap (Observability Analytics Platform by Skywalking).(The configurations from ‘d’ to ‘i’ enable Skywalking OAP’s self-observability, meaning it will expose Prometheus-compatible metrics at port 1234 with SSL disabled. Again, in Step 3, we will configure the OTel collector to scrape this endpoint.) In the helm chart, we have also enabled the creation of a service account for Skywalking OAP. Step 3: Setting Up Istio + Skywalking Configuration After that, we can install Istio using this IstioOperator configuration. In the IstioOperator configuration, we have set up the meshConfig so that every Sidecar will have enabled the envoy access logs service and set the address for access logs service and metrics service to skywalking. Additionally, with the proxyStatsMatcher, we are configuring all metrics to be sent via the metrics service. YAML meshConfig: defaultConfig: envoyAccessLogService: address: "skywalking-skywalking-helm-oap.skywalking.svc:11800" envoyMetricsService: address: "skywalking-skywalking-helm-oap.skywalking.svc:11800" proxyStatsMatcher: inclusionRegexps: - .* enableEnvoyAccessLogService: true Step 4: OpenTelemetry Collector Once the Istio and Skywalking configuration is done, we need to feed metrics from applications, gateways, nodes, etc, to Skywalking. We have used the opentelemetry-collector.yaml to scrape the Prometheus compatible endpoints. In the collector, we have mentioned that OpenTelemetry will scrape metrics from istiod, Kubernetes-cluster, kube-state-metrics, and skywalking. We have created a service account for OpenTelemetry. Using opentelemetry-serviceaccount.yaml, we have set up a service account, declared ClusterRole and ClusterRoleBinding to define what all actions the opentelemetry service account will be able to take on various resources in our Kubernetes cluster. Once you deploy the opentelemetry-collector.yaml and opentelemetry-serviceaccount.yaml, there will be data flowing into Skywalking from- Envoy, Kubernetes cluster, kube-state-metrics, and Skywalking (oap). Step 5: Observability of Kubernetes Resources and Istio Resource in Skywalking To check the UI of Skywalking, port-forward the Skywalking UI service to port (say 8080). Run the following command: Shell kubectl port-forward svc/skywalking-skywalking-helm-ui -n skywalking 8080:80 You can open the Skywalking UI service at localhost:8080. (Note: For setting up load to services and see the behavior and performance of apps, cluster, and Envoy proxy, check out the full video. ) Once you are on the Skywalking UI (refer to the image below), you can select service mesh in the left-side menu and select control plane or data plane. Skywalking would provide all the resource consumption and observability data of Istio control and data plane, respectively. Skywalking Istio-dataplane provides info about all the Envoy proxies attached to services. Skywalking provides metrics, logs, and traces of all the Envoy proxies. Refer to the below image, where all the observability details are displayed for just one service-proxy. Skywalking provides the resource consumption of Envoy proxies in various namespaces. Similarly, Skywalking also provides all the observable data of the Istio control plane. Note, in case you have multiple control planes in different namespaces (in multiple clusters), you just provide the access Skywalking oap service. Skywalking provides Istio control planes like metrics, number of pilot pushes, ADS monitoring, etc. Apart from the Istio service mesh, we also configured Skywalking to fetch information about the Kubernetes cluster. You can see in the below image Skywalking provides all the info about the Kubernetes dashboard, such as the number of nodes, pods, K8s deployments, services, pods, containers, etc. You also get the respective resource utilization metrics of each K8 resource in the same dashboard. Skywalking provides holistic information about a Kubernetes cluster. Similarly, you can drill further down into a service in the Kubernetes cluster and get granular information about their behavior and performance. (refer to the below images.) For setting up load to services and seeing the behavior and performance of apps, cluster, and Envoy proxy, check out the full video. Benefits of Istio Skywalking Integrations There are several benefits of integrating Istio and Apache Skywalking for Unified observability. Ensure 100% visibility of the technology stack, including apps, sidecars, network, database, OS, etc. Reduce 90% of the time to find the root cause (MTTR) of issues or anomalies in production with faster troubleshooting. Save approximately ~$2M of lifetime spend on closed-source solutions, complex pricing, and custom integrations.
IBM App Connect Enterprise (ACE) is a powerful and widely used integration tool. Developers create integration flows by defining an entry point that receives a message, then processing that message, and finishing by sending or placing the transformed message. Flows consist of a series of nodes and logical constructs. ACE is powerful and flexible — there are many nodes provided specifically to interact with the systems being integrated, however there are also nodes that can run a script or Java code. Because of this, ACE can do pretty much anything, and as such could be considered (although this is not its intended purpose) as an application runtime environment. An ACE flow is a deployable unit that is inherently stateless, although it can manage its own state. In a traditional server environment, many flows are deployed on an integration server and their execution can be managed and scaled using the workload management features. This makes ACE a natural fit for a Kubernetes environment. It is therefore tempting to consider an ACE flow as a microservice and deploy each flow to a cloud in that way. Whilst this is certainly possible, there are many considerations to ensure a successful deployment of ACE on Kubernetes. The Problem Deployment Overhead Deploying many microservices can result in increased resource overhead compared to traditional application server architectures. Each microservice requires its own operating system and instance of whatever runtime it is using. In ACE terms, this means creating many integration servers with one flow on each. However, an integration server itself requires resources. Consider an application with 100 flows deployed on a server with 4 CPUs and 8GB of RAM. Some of the flows are used heavily throughout the business day, but some are only called once a day for a batch job in the evening. During high usage the busy flows can consume most of the CPU. At peak times they will slow down a bit because they compete for CPU, but this is okay because the CPU can manage many threads. The flow instances handling the batch job will remain idle, consuming no CPU until they are needed when the busy flows are idle. In ACE 12.0.5 and above, the effective minimum CPU requirement is 0.1 CPU and 350MB of memory, so in an application with 100 flows that’s a minimum of 10 CPUs and 35GB of memory which is a significant increase in terms of both infrastructure and license cost. Not only that, but in this example every flow has the minimum allocated CPU at all times, which is nowhere near enough for the busy flows and far more than the once-per-day flows need. Logical vs. Operational Optimization It is possible to analyze the requirements of all your flows and group together flows based on release schedule, demand profile, common connections/assets and so on (see section Efficiency of Scaling Grouped Flows). These criteria would be purely operational, and this could be an entirely valid way of doing it. However, there would be no relationship between where a flow is deployed and its function. Pods and deployments would not have names that identified their functions. This may be possible in a highly automated DevOps setup, but this may not be ideal for most projects. On the other hand, deploying one flow per pod maps deployments perfectly to logical functions — your flow and its pod would have a name relating to its function. Generally modern apps rely on naming conventions and design related to logical function to reduce the need for documentation and make the project as easy and transparent as possible to work on. Software applications are created and maintained by humans after all, and humans need things to be clear and straightforward from a human perspective. But as we have seen this is not necessarily feasible in a large ACE project. The aim of this document is to demonstrate ways to link the logical and operational deployment patterns — in other words, to create operational optimization without losing logical design. ACE vs. Other Runtimes If microservices can be deployed in their own pods and scaled as required, what is the difference between ACE and other microservice runtimes such as Open Liberty or NodeJS? Start-up time and cost: Integrations deployed in BAR files need to be compiled before use. If uncompiled BAR files are deployed this adds a CPU spike on start-up. This can also be picked up by the horizontal pod autoscaler. License cost: Since ACE is proprietary software it adds license cost. Performance Planning Pods vs. Instances One of the key concepts of cloud-native microservice architecture is that multiple instances of a service can exist, and state should be stored outside the instance. This means that as many instances as are needed can be deployed. This can be determined at design time, and can be altered whilst running either as a manual operation or automatically according to load. In Kubernetes, a microservice is represented as a deployment, and this creates a given number of pods which are service instances. The number of pods can be scaled up or down. A Kubernetes cluster should have multiple worker nodes, to provide redundancy. Because it is a distributed system, there should also be an odd number of nodes, so that if one fails the other two still constitute a quorum. In practice, this means three worker nodes. This then dictates that deployments should have three pods — one on each node — to spread the load appropriately. It is possible to configure deployments to deploy more pods (via a horizontal pod autoscaler or HPA) to provide more processing power when demand is high. Pod Scaling vs. ACE Workload Management ACE includes native workload management features, one of which is the ability to create additional instances of a deployed message flow within the integration server in the pod as opposed to more pods. Additional instances correspond to additional processing threads. It is possible to start with zero additional instances (i.e., one instance) and set the integration server to create new instances on demand up to a maximum value. The additional instances will be destroyed when no longer needed, and their onward connections (e.g., to a DB) will be closed, however the RAM will not be released back to the OS. So whilst pod CPU usage will return to lower levels RAM will not. This may or may not be a problem depending on your constraints — RAM usage does not affect license cost, however the infrastructure still needs to be provisioned and paid for. Therefore, the additional instances property should be carefully considered in conjunction with Kubernetes pod scaling. Additional flow instances are much more resource-efficient than additional pods containing integration servers. The number of additional instances must be determined in conjunction with the number of CPUs allocated to the pod: If a flow is an asynchronous process, e.g., reading from a queue and writing to another, then it can be busy almost all the time assuming the processing done by the flow is CPU intensive. If that is the case then the total number of flow instances (bear in mind that zero additional instances means one total instance) should match the number of CPUs configured on the pod. However, if a flow is a synchronous process e.g., it calls a downstream HTTP service or a database, then the flow will be blocked waiting for that downstream service to complete, during which time the thread would be idle. If a flow spends 50% of its time idle waiting for a downstream response and 50% of its time processing the results, then two flow instances (i.e. one additional) can be configured in a pod that has one CPU. Of course, you could create two pods instead of two flow instances, but this is much less efficient because of the overhead of running second pod and an entire integration server inside it. For a synchronous flow, the following equation is a good starting point if you can measure the CPU utilization when running a single instance: If you are analyzing performance using timings printed out in debugging statements or the user trace features, or using the CP4I tracing tool, then you should know how long the total flow execution takes and how long is spent waiting for a downstream service call. If so, then the equation can be written as follows: If pods are configured like this, then the CPU of the pod should be well utilized, but not so much so that a single pod restarting or failing will cause the others to be overloaded. Of course, if a flow is configured to have two flow instances, then each new pod results in two more flow instances at a time. The benefit of using the HPA for scaling over is that when load decreases the additional pods will be destroyed, which will release resources. The disadvantage is that more resources are required for the extra pod overhead. In general, the workload management properties should be used for modest scaling, and the HPA used sparingly for highly elastic demand — for example, a bank that needs to deploy large numbers of pods at the end of each month for paydays, or for a retailer needing to handle a Black Friday sale. When using the HPA, it is important to realize that ACE integration servers can have high start-up CPU load if BAR files are not pre-compiled, or if flows are written with start-up tasks e.g., cache building, resource loading etc. The HPA may pick up this high CPU usage on start-up and start deploying more pods, which will then need to start up as well — and this could cause runaway resource consumption. To overcome this, configure a stabilization window on your HPA. Performance tuning is of course a complex subject, and a detailed description of ACE performance tuning is outside the scope of this document. Scheduled Scaling An alternative to using the HPA is to use a tool like KEDA to provide scaling based on events such as external messages, or according to a schedule. For example, an e-commerce organization could scale-up pods in advance of a product being released or a discount event. This has the benefit of being proactive rather than reactive, like the HPA. Grouping Flows Operationally With a typical application containing many hundreds of flows we know that we will need to group them together on the same integration servers and therefore pods. There are several operational and deployment/packaging criteria that can be used for this. Group integrations that have stable and similar requirements and performance. Group integrations with common technical dependencies. A popular use case is dependency on MQ. Group integrations where there are common cross dependencies, i.e., integrations completely dependent on the availability of other integration. Group integrations that have common integration patterns, e.g., sync vs. async. Synchronous calls between flows are much better done in the same integration server because they can use the callable flow pattern rather than HTTP interaction which would be much less efficient. Group integrations with common or similar scalability model is needed. Group integrations with common or similar resilience (high availability) model is needed. Group integrations with common or similar shared lifecycle. A common situation where this may occur is where integrations use shared data models, libraries, etc. Group integrations that require common Integration Server runtime components i.e., C++, JVM, NodeJS, etc. to leverage and optimize deployment using the new Dynamic Resource Loading feature. Logical Patterns for Grouping Flows Large enterprise applications depend on consistent design principles for manageability and quality. Choosing specific design patterns for implementation helps this greatly. Integration flows can be deployed anywhere and will still work, however this kind of deployment will make it hard to create logical groupings. Here we will look at patterns into which integration flows fit; if the physical deployments follow the logical architecture the flows will be easier to manage. Here are some examples of ideas that could help to group flows by business function and/or integration style, so that they will still fit into a microservice concept. Composite Service Consider a system where ACE is acting as a wrapper for downstream services. Each flow invokes a single operation on a downstream service. Related flows, for example, flows that invoke database stored procedures to create, update or delete customer records, can be grouped into a single Application and deployed on their own integration server. When considered this way, the deployed integration server becomes the customer management service. Workload management can be used statically (i.e., selecting "deploy additional instances at startup") to set the number of instances for each thread relative to the others, if some flows need to handle more traffic than others. Message Hub/Broker If your application has flows that place messages onto queues or Kafka topics, it might be worth considering these as related functions and grouping them all into a single Application deployed in a single pod, called a Message Hub or Broker. Business Function Grouping by business functional area into single services reduces scaling flexibility, which ultimately results in less efficiency — if one flow is overloaded and a new pod is scaled up, then new capacity for other flows may be created and not needed. But, if suitable CPU request and limits are configured, and workflow management, this may ultimately be more efficient in terms of absolute CPU use. Efficiency of Scaling Grouped Flows In an ideal microservice environment, each flow would be a microservice that could be scaled independently. However, due to the inherent overhead of integration server pods, this is not efficient as we have seen. If flows are grouped together though, then when scaling one flow you may end up with instances of other flows that you don’t need. This is also inefficient. But in some circumstances it may still be more efficient than multiple pods. Consider two flows, A and B each in their own pod. Each might initially be using only a negligible amount of CPU, but the minimum requirement for each pod is 0.1 so that is the CPU request. Replicated across three nodes that means 0.6 CPU request. Let’s assume that the flows are single threaded, so the limit for each pod should be 1 CPU. That means that the upper limit is 6 CPU. Now, if flow A becomes very busy that means a maximum of 3.3 CPU used by both flows — 3 from three instances of flow A and the original 0.3 from three instances of flow B. If it becomes even busier, you can scale deployment A up by one pod so there are four flow A pods, and this means the total CPU is now 4.3 CPU. If flows A and B are both deployed in the same pod, then when traffic is low the total request is only 0.3 CPU. When flow A becomes busy, this becomes 3 CPU. When it scales up to 4 pods, the total is 4 CPU. Flow B has been deployed twice but since it is not using significant CPU flow A still has the resources it needs. In this scenario, the extra overhead of deploying the two flows independently is greater than the cost of duplicating flow B unnecessarily. Node Level Licensing IBM licenses pods based on the specified CPU limits. However, if ACE pods are restricted to running on a single worker node IBM will not charge more than the CPUs available to that node. This means that pod affinity can be used to deploy ACE only on certain nodes. If no CPU limits are set, then any of the pods can use as much CPU resources as are available to the node, and the node’s operating system can schedule the threads as it sees fit in the same way that it does in a non-container environment. This can be a useful technique when doing a lift-and-shift migration from a non-container environment.
My demo, Evolving your APIs, features a custom Apache APISIX plugin. I believe that the process of creating a custom plugin is relatively well-documented. However, I wanted to check the parameters of the _M.access(conf, ctx) function, especially the ctx one. The documentation states: The ctx parameter caches data information related to the request. You can use core.log.warn(core.json.encode(ctx, true)) to output it to error.log for viewing. Unfortunately, core.log ultimately depends on nginx's logging, and its buffer is limited in size. Thanks to my colleague Abhishek for finding the info. For this reason, the ctx display is (heavily) truncated. I had to log data bit by bit; however, it was instructive. The Context The ctx parameter is a Lua table. In Lua, table data structures are used for regular indexed access (akin to arrays) and key access (like hash maps). A single ctx instance is used for each request. The Apache APISIX engine reads and writes data in the ctx table. It's responsible for forwarding the latter from plugin to plugin. In turn, each plugin can also read and write data. I resorted to a custom plugin to conditionally apply rate-limiting in the demo. The custom plugin is a copy-paste of the limit-count plugin. Note that the analysis is done in a specific context. Refrain from assuming the same data is available on your own. However, it should be a good starting point. Overview of the ctx Parameter The data available in the ctx parameter is overwhelming. To better understand it, we shall go from the more general to the more particular. Let's start from the overview. _plugin_name: self-explanatory conf_id: either route ID or service ID proxy_rewrite_regex_uri_capture: data set by the proxy-rewrite plugin. route_id: route ID the plugin is applied to route_name: route name the plugin is applied to real_current_req_matched_path: URI for which matching was done conf_version: etcd-related revision — see below var: references the ctx object and a cache of data about the request, e.g., URI, method, etc. matched_route: the route that was matched based on host header/URI and/or remote_addr; see below plugins: pairs of plugin/data — see below Matched Route The matched_route row is a complex data tree that deserves a detailed description. key: access key in the etcd datastore created_index, modifiedIndex and orig_modifiedIndex: these attributes are related to etcd and how it stores metadata associated with revisions. Different revisions of a single key are logged in the create_revision and pre_revision fields. The former points to the initial created row ID and is constant throughout the changes, while the latter points to the row ID of the previous value.Apache APISIX maps them respectively to the created_index and modifiedIndex values and uses them for caching. In many places, created_index is later assigned to conf_version - see above. prev_plugin_config_ver: after a plugin configuration is merged with the route configuration, the current modifiedIndex is assigned to prev_plugin_config_ver. It allows saving CPU cycles if one attempts to apply the same plugin config later in the call chain. update_count: replaced with modifiedIndex has_domain: whether the matched route references an upstream with a domain, e.g., http://httpbin.org, or not, e.g., 192.168.0.1 orig_plugins: temporary placeholder used if a route has plugins defined directly and reference a plugins config clean_handlers: list of functions scheduled to be called after a plugin has been created valuehas keys related to how the route was created, as well as a couple of others: Shell curl http://apisix:9180/apisix/admin/routes/2 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' { "name": "Versioned Route to Old API", "methods": ["GET"], "uris": ["/v1/hello", "/v1/hello/", "/v1/hello/*"], "upstream_id": 1, "plugin_config_id": 1 }' priority: since we didn't set it, it has a default value 0. Priority is essential when multiple routes match to determine which one to apply. create_time: self-explanatory update_time: self-explanatory plugins: references to plugin's function status: I couldn't find this Plugins The plugins value contains plugin-related data in an indexed-based Lua table. Each plugin has two entries: the first (even-indexed) entry contains data related to the plugin in general, e.g., its schema, while the second (odd-index) entry data is related to its configuration in the current route. My setup has two plugins, hence four entries, but to keep things simpler, I kept only a single plugin in the following diagram: Key values match directly to the plugin schema and configuration; you can check the whole descriptions directly in the plugin. A Final Trick I initially had issues printing the ctx table because of the nginx buffer limit and had to do it bit by bit. However, you can print it to a file. Here's the function, courtesy of my colleague Zeping Bai: Lua local file, err = io.open("conf/ctx.json", "w+") if not file then ngx.log(ngx.ERR, "failed to open file: ", err) return end file.write(core.json.encode(ctx, true) .. "\n") file.close() Here's the whole data representation, in case you have good eyes: Alternatively, here's the PlantUML representation. Conclusion In this post, I described the structure of the ctx parameter in the access() function. While specific entries vary from configuration to configuration, it gives a good entry point into data manipulated by plugins. It's also a good reminder that even if you're not fluent in a language or a codebase, you can get quite a lot of information by logging some variables. To Go Further ctx parameter
NET Core and ASP.NET Core are popular frameworks for creating powerful RESTful APIs. In this tutorial, we will use it to develop a simple Minimal API that simulates a credit score rating. Minimal APIs provide a streamlined approach to creating high-performing HTTP APIs using ASP.NET Core. They allow you to construct complete REST endpoints with minimal setup and code easily. Instead of relying on conventional scaffolding and controllers, you can fluently define API routes and actions to simplify the development process. We will create an endpoint allowing a user to retrieve a credit score rating by sending a request to the API. We can also save and retrieve credit scores using POST and GET methods. However, it is essential to note that we will not be linking up to any existing backend systems to pull a credit score; instead, we will use a random number generator to generate the score and return it to the user. Although this API is relatively simple, it will demonstrate the basics of REST API development using .NET Core and ASP.NET. This tutorial will provide a hands-on introduction to building RESTful APIs with .NET Core 7 and the Minimal API approach. Prerequisites Before we start, we must ensure that we have completed several prerequisites. To follow along and run this tutorial, you will need the following: A working .NET Core installation An IDE or text editor of your choice Postman to test our endpoint Creating the Initial Project We’ll be using the .NET cli tool to create our initial project. The .NET command line interface provides the ability to develop, build, run, and publish .NET applications. The .NET cli new command provides many templates to create your project. You can also add the search command to find community-developed templates from NuGet or use dotnet new list to see available templates provided by Microsoft. We’ll be creating a Minimal API and starting from as clean a slate as possible. We’ll be using the empty ASP.NET Core template. In the directory of your choosing; enter the following in the terminal: dotnet new web You’ll notice that the directory structure will look something like this: We’ll be doing all of our work in the Program.cs file. Its starting code should look similar to the following: var builder = WebApplication.CreateBuilder(args); var app = builder.Build(); app.MapGet("/", () => "Hello World!");app.Run(); We can see how concise and readable our starter code is. Let’s break down the code provided by the template line by line: The WebApplication.CreateBuilder(args) method creates a new instance of the WebApplicationBuilder class, which is used to configure and build the WebApplication instance. The args parameter is an optional array of command-line arguments that can be passed to the application at runtime. The builder.Build() method is called to create a new instance of the WebApplication class, which represents the running application. This instance configures the application, defines routes, and handles requests. The third line defines a route for the root path (“/”) of the application using the app.MapGet() method. This means that when the root path is requested, the application will respond with the string “Hello World!”. We start the application by calling the app.Run() method. Using the builder pattern, we can configure and customize the WebApplication instance. This allows us to define the application’s behavior, including middleware, routes, and other settings, in a structured and extensible way. For example, the WebApplication instance created by the builder can be thought of as the “entry point” of the application, which handles requests and generates responses. Overall, this code block creates a simple Minimal API in .NET 7 that responds with a “Hello World!” message when the application’s root path is requested. Next, we’ll customize our API to mimic retrieving a credit score rating. Adding in the Code In Program.cs, we will house our endpoints and business logic. We’ll define our creditscore endpoint to provide GET and POST operations. We’ll implement a list to store any credit score we would like. We’ll also define an endpoint to retrieve the list of saved credit scores. We’ll be utilizing a CreditScore record, a new reference type in C# 10 similar to structs. A record is a lightweight and immutable data object optimized for comparison and equality checking. Populate Program.cs with the following code: var builder = WebApplication.CreateBuilder(args); var app = builder.Build(); var userAddedCreditScores = new List<CreditScore>();app.MapGet("/creditscore", () => { var score = new CreditScore ( Random.Shared.Next(300, 850) ); return score; });app.MapPost("/creditscore", (CreditScore score) => { userAddedCreditScores.Add(score); return score; });app.MapGet("/userAddedCreditScores", () => userAddedCreditScores);app.Run();record CreditScore(int Score) { public string? CreditRating { get => Score switch { >= 800 => "Excellent", >= 700 => "Good", >= 600 => "Fair", >= 500 => "Poor", _ => "Bad" }; } } As mentioned, our code first creates a builder object for the web application and then uses it to build an application instance. It also defines a record type called CreditScore with a single property called Score and a read-only property called CreditRating. This may look a little strange as we define our record after using it. However, this is due to namespaces, and the record must be defined outside of the WebApplication namespace. The application exposes multiple endpoints using app.MapGet() and app.MapPost() methods. The first endpoint, /creditscore is a GET method that generates a new random CreditScore object with a score between 300 and 850. We’ll define a POST method for the same endpoint that accepts a CreditScore object in the request body, adds it to a list called userAddedCreditScores, and returns the same CreditScore object to the caller. The other endpoint /userAddedCreditScores is a GET method that returns a list of all the CreditScore objects that have been added to userAddedCreditScores. Finally, the application starts running using app.Run(). Running and Testing the API With our code written, run the following command to compile and run our project: dotnet run The API is now operational and ready for testing. After running the previous command, you will see which port has been used to host your API in the console. You can define which port you would like to use by editing the Properties > launchSettings.json file or by adding editing the app.Run() command in Program.cs like so, replacing 3000 with your desired port number: app.Run("http://localhost:3000"); You can use a tool like Postman to send an HTTP request to the API. For me, the endpoint to get a credit score is localhost:5242/creditscore. When you send a request to this endpoint, you should receive a 200 OK status code, a credit score generated by the random number generator, and a credit rating. We can save a credit score by sending a POST request to the creditscore endpoint. We form the request’s body with a CreditScore object. Finally, we can retrieve all added scores by sending a GET request to the /userAddedCreditScores endpoint. Wrapping Up In summary, we have developed a basic RESTful Minimal API using .NET Core 7 and ASP.NET. This code can be a foundation for creating more complex APIs for your application. As you continue to develop the API, you may want to consider implementing security measures such as an API key, integrating with an API gateway, monitoring the usage of the API, or generating revenue through API monetization.
With the clear dominance of microservices architecture, communication between different components of a system is a critical aspect of today’s software paradigm. Two popular methods of achieving this communication are through REST (direct communication), and message brokers (indirect communication). Each approach has its own set of advantages and trade-offs, making it essential for developers to understand the differences between them in order to make informed decisions when designing and building their systems. Although the two feel like serving completely different use cases and do not intertwine, in many cases and architectures, they are. In this article, we’ll delve into the disparities between REST and message brokers in terms of way of communication, helping you make the right choice for your specific use case. REST REST, a widely used architectural style for designing networked applications, relies on stateless communication between client and server. Here are some key features of REST communication: Request-Response Paradigm: REST operates on a simple request-response model. Clients initiate a request to the server, and the server responds with the requested data or an appropriate status code. HTTP Verbs: REST communication is based on HTTP verbs such as GET, POST, PUT, and DELETE, which correspond to CRUD (Create, Read, Update, Delete) operations. Resource-Oriented: REST revolves around the concept of resources identified by URLs. Each resource represents a distinct entity, and interactions are performed using these URLs. Stateless: REST is designed to be stateless, meaning that each request from the client must contain all the information required for the server to fulfill it. This simplifies server-side management and scalability. Caching: REST communication benefits from HTTP’s built-in caching mechanisms, which can improve performance and reduce server load by serving cached responses when appropriate. Message Brokers Message brokers facilitate asynchronous communication between components by allowing them to exchange messages or pieces of information, meaning the sender does not aware at any given time if a receiver exists or who it is. Here’s what you need to know about this approach: Decoupled Architecture: Message brokers promote decoupling between sender and receiver, allowing components to communicate without having to be aware of each other’s existence. Publish-Subscribe Model: In the publish-subscribe model, producers (publishers) send messages to specific topics, and consumers (subscribers) interested in those topics receive the messages. This enables broadcasting information to multiple consumers. Message Queues: Message brokers also support point-to-point communication through message queues. Producers send messages to queues, and consumers retrieve messages from those queues, ensuring that each message is processed by a single consumer. Reliability: Message brokers ensure message delivery, even in cases of component failures. This reliability is achieved through features like message persistence and acknowledgment mechanisms. Scalability: Message brokers can be scaled horizontally to handle increasing message volumes and provide load balancing across consumers. The Story of Microservices Representational state transfer (REST) uses a popular architectural pattern called API Gateway, and it can serve as a good example of the synchronous communication type.Requests reach a service that acts as an internal router that routes requests based on the different values, headers, and query params. Message brokers/queues are widely used in a microservices architecture as well, which follows the asynchronous pattern. In this type of architecture, a service sends a message without waiting for a response, and one or more services process the message asynchronously. Asynchronous messaging provides many benefits but also brings challenges such as idempotency, message ordering, poison message handling, and complexity of message broker, which must be highly available. It is important to note the difference between asynchronous I/O and the asynchronous protocol. Asynchronous I/O means that the calling thread is not blocked while the I/O operations are executed. This is an implementation detail in terms of the software design. The asynchronous protocol means the sender does not wait for a response. Choosing the Right Approach The decision between REST communication and message brokers depends on various factors, including the nature of your application, communication patterns, and requirements: REST is suitable when: Direct request-response interactions are preferred. Your app requires simplicity in communication patterns. You have very strict communication rules with almost 1:1 sender/receiver ratio Scale is small and so is the amount of communicating services, workloads, and amount of transferred data. Message brokers are beneficial and a must when: Asynchronous communication is needed and allowed. Many-to-Many communication pattern is needed. Components are loosely coupled, allowing for independent scaling. Reliability and guaranteed message delivery are paramount. Publish-subscribe or message queue patterns align with the application’s communication needs. A great scale is needed to support billions of requests in a short period of time. Scaling the microservices would be overkill. In conclusion, both REST and message brokers offer distinct advantages for different scenarios. REST provides simplicity and direct interactions, while message brokers enable decoupled, asynchronous, reliable, and much more scaleable communication. The choice between these approaches should be made based on your system’s requirements, the specific communication patterns your application demands, and the maturity of both the environment and the developers themselves.
In today's interconnected business landscape, where organizations rely on a plethora of disparate systems and applications, seamless data exchange and collaboration are paramount. Enterprise Integration Patterns (EIPs) have emerged as a powerful solution to address the challenges of integrating various systems, enabling businesses to achieve streamlined processes, enhanced operational efficiency, and improved decision-making. Enterprise Integration Patterns (EIPs) are a collection of best practices and design tenets that are employed to address typical problems with integrating various systems within an enterprise. With the help of these patterns, problems with data transformation, routing, communication, and cooperation between various applications, services, and platforms can be addressed in a consistent manner. Regardless of the underlying technologies, protocols, or data formats used by the systems, the objective of EIPs is to enable seamless and effective data exchange and collaboration between them. Organizations can improve the interoperability, flexibility, scalability, and maintainability of their integration solutions by implementing EIPs. In this article, we delve into the world of Enterprise Integration Patterns, exploring their significance, common patterns, and their role in transforming businesses. Understanding Enterprise Integration Patterns Enterprise Integration Patterns, introduced by Gregor Hohpe and Bobby Woolf in their book of the same name, provide a catalog of time-tested design solutions for connecting and synchronizing systems within an enterprise. These patterns act as a common language for software architects, developers, and stakeholders, facilitating effective communication and collaboration across diverse teams. Importance of Enterprise Integration Patterns Seamless Data Exchange: EIPs enable the smooth flow of data between different systems, irrespective of their disparate architectures and technologies. They ensure data consistency, integrity, and reliability while maintaining a high level of interoperability. Scalability and Flexibility: EIPs promote scalability by allowing organizations to add or modify systems without disrupting existing integrations. They provide a flexible framework that can accommodate changes in business requirements, supporting growth and evolution. Cost Optimization: By leveraging EIPs, businesses can avoid costly point-to-point integrations and adopt a more centralized and modular approach. This reduces maintenance efforts, minimizes development time, and optimizes resource allocation. Key Concepts in Enterprise Integration Patterns Messages: Messages represent units of data exchanged between systems. They can be structured in various formats such as XML, JSON, or plain text. Messages carry information from one system to another, enabling communication and data synchronization. Channels: Channels serve as communication pathways or conduits through which messages flow. They provide a medium for sending and receiving messages between systems. Channels can be implemented using message queues, publish-subscribe mechanisms, or other communication protocols. Message Endpoints: Message endpoints are the integration points where systems interact with each other by sending or receiving messages. Endpoints define the interfaces and protocols used for message exchange, ensuring that messages are correctly transmitted and received by the intended systems. Message Routing: Message routing involves directing messages from a source system to one or more destination systems based on certain criteria. Routing can be based on content, metadata, or specific rules defined in the integration solution. It ensures that messages reach the appropriate systems for processing. Message Transformation: Message transformation involves modifying the structure or format of messages to ensure compatibility between systems. It includes activities like data mapping, validation, enrichment, and conversion from one data format to another. Transformation ensures that data is correctly interpreted and processed by the receiving system. Message Splitting and Aggregation: Sometimes, it is necessary to break down or split large messages into smaller, more manageable parts for processing. Conversely, message aggregation involves combining multiple smaller messages into a single message for further processing or analysis. Splitting and aggregation enable efficient data processing and collaboration between systems. Benefits of Enterprise Integration Patterns Standardization: EIPs provide a standardized approach to integration, allowing organizations to establish a common language and understanding among architects, developers, and stakeholders. This promotes better collaboration and communication, reducing complexity and enabling effective teamwork. Reusability: EIPs encapsulate proven design solutions to common integration challenges. By leveraging these patterns, organizations can build reusable components and frameworks, reducing development effort and promoting code reuse across different integration projects. Scalability and Flexibility: EIPs enable organizations to build scalable and flexible integration solutions. The patterns support the addition of new systems, modification of existing systems, and handling increased data volume without disrupting the overall integration architecture. This allows businesses to adapt to changing requirements and scale their integration infrastructure as needed. Maintainability: EIPs promote modular and decoupled integration solutions, making it easier to maintain and update individual components without affecting the entire system. This simplifies troubleshooting, debugging, and maintenance activities, resulting in improved system reliability and stability. Performance and Efficiency: By employing message routing, filtering, and transformation techniques, EIPs help optimize performance and reduce unnecessary data processing. Messages are selectively processed and delivered to the appropriate systems, improving system efficiency and response times. Common Enterprise Integration Patterns Publish-Subscribe: This pattern enables systems to publish messages to specific channels, and other systems that have subscribed to those channels receive the messages. It facilitates broadcasting information to multiple systems simultaneously. Request-Reply: In this pattern, a system sends a request message to another system and expects a reply message in response. It enables synchronous communication between systems, where the requester waits for a response before proceeding further. Message Translator: This pattern focuses on transforming messages from one data format or protocol to another. It enables interoperability between systems that use different data representations, allowing them to understand and process messages correctly. Message Filter: This pattern enables the selective filtering of messages based on specific criteria, allowing systems to process only the relevant information. It enhances system performance by reducing the amount of unnecessary data being processed. The message filter pattern allows systems to selectively process messages based on predefined criteria. It filters out messages that do not meet the specified conditions, ensuring that only relevant messages are processed. Content-Based Router: This pattern routes messages to different destinations based on the content of the messages. It examines the content of incoming messages and determines the appropriate destination or processing path based on predefined rules or conditions. Message Splitter: The message splitter pattern divides a single message into multiple smaller messages. It is useful when a system needs to process individual parts of a large message separately or when distributing work among multiple systems or processes. Message Aggregator: This pattern combines multiple smaller messages into a single larger message. It is used when multiple systems produce related messages that need to be aggregated and processed as a whole. Message Broker: The message broker pattern acts as an intermediary between sender and receiver systems. It receives messages from sender systems, stores them temporarily, and ensures reliable delivery to the appropriate receiver systems. It decouples systems and provides asynchronous message exchange. Event-Driven Consumer: This pattern enables systems to react to events or messages asynchronously. Instead of actively requesting or polling for new messages, systems listen for events or messages and respond accordingly when they occur. Service Activator: The service activator pattern triggers a service or system to perform a specific action in response to an incoming message. It invokes the appropriate service or component to process the message and generate a response if required. Message Routing: This pattern deals with the flow and transformation of messages between systems. It includes filters, content-based routers, and dynamic routers, enabling messages to be selectively delivered based on content, destination, or other parameters. Message Transformation: This pattern facilitates the transformation of data formats and structures to ensure compatibility between systems. It includes techniques such as message enrichment, translation, and normalization. Message Endpoint: This pattern represents the integration point where systems send or receive messages. It encompasses concepts like publish-subscribe, request-reply, and message-driven beans, enabling asynchronous communication and decoupling of systems. Message Construction: This pattern focuses on constructing complex messages from simpler ones. It includes techniques like message aggregation, composition, and splitting, allowing systems to collaborate efficiently by exchanging composite messages. Message Routing Channels: This pattern establishes channels that facilitate communication between systems. Channels can be implemented as message queues, publish-subscribe topics, or message brokers, providing reliable and scalable integration solutions. Integration Frameworks and Tools Several integration frameworks and tools have been developed to implement Enterprise Integration Patterns effectively. Apache Camel, Spring Integration, and MuleSoft are some popular frameworks that provide extensive support for designing, implementing, and managing integration solutions. These frameworks offer a wide range of connectors, processors, and adapters, simplifying the development process and reducing time to market. Conclusion Enterprise Integration Patterns have become a key building block for developing reliable and scalable integration solutions in today's complex business environment. EIPs give businesses the tools they need to overcome the difficulties of integrating dissimilar systems, ensuring smooth data exchange, and promoting collaboration. They do this by offering a comprehensive catalog of tested design solutions. By embracing EIPs and utilizing integration frameworks, businesses can achieve operational efficiency, agility, and innovation and thereby gain a competitive edge in the digital landscape. Enterprise Integration Patterns are essential for achieving effective and seamless integration of various systems within an organization. By implementing these patterns, organizations can get past the difficulties associated with data transformation, routing, and coordination, enabling them to create scalable, adaptable, and maintainable integration solutions. Organizations can streamline their operations, improve collaboration, and gain a competitive edge in today's interconnected business environment by utilizing the advantages of standardization, reusability, and performance optimization.
Enterprise security teams have had since 2015 to familiarize themselves with GraphQL API security. But many — if not most — still haven’t captured the security nuances of the popular open-source query language. Simply understanding GraphQL’s processes and vulnerable attack vectors isn’t sufficient; it’s also necessary to recognize exploit attempts and nefarious queries (and trigger actions whenever those threats arise). A complete GraphQL security strategy must also be ready to defeat attacks designed to infiltrate GraphQL specifically. A more generalized API security strategy isn’t going to cut it, as headlines continue to prove. Security teams likely have either a web application firewall (WAF) performing active application monitoring to detect threats or a comparable in-house solution that leverages access logs to monitor threat behavior. They also likely depend on specific indicators when monitoring for anomalous activity — including HTTP methods and response status codes, sensitive API routes, and API parameters. Keeping an eye on these indicators can identify attacks where clients try to overwhelm a registration endpoint, perform multiple unsuccessful logins, attempt account enumeration, or tamper with key parameters. With REST APIs, monitoring these events is relatively simple. An application only needs to return HTTP status codes that adhere to the RFC 2616 protocol for clients and security teams to receive and understand clear activity signals (as long as security teams have a relatively accurate API inventory). This traditional monitoring approach relies on HTTP components. For example, the following rule uses the GET HTTP method and the 403 Forbidden response code to trigger a security alert if a client trying to access forbidden locations on the /v1/user/<id>/profile route (available for most REST APIs): Python alert if (http.method=GET and response.status_code=403 and http.uri=”/v1/user/<id>/profile”) GraphQL routes don’t matter in the same way. First, GraphQL uses its own singular route (usually /graphql). But more importantly, client intentions with respect to GraphQL APIs are determined by the query itself. GraphQL APIs may allow GET and POST methods, but only the query offers an accurate picture of what the client is really trying to accomplish. Therefore, traditional monitoring rules based on HTTP components aren’t at all effective at monitoring GraphQL API security. GraphQL APIs also don’t return traditional HTTP status codes. It varies between different implementations of the GraphQL server, but it will usually return a 200 OK code to a query regardless of whether errors exist. Errors can instead be monitored by checking the server response using a dedicated errors JSON key. The following example query lets a user receive their own user details by providing their user ID number: Python query { user(id: 1000) { email name address } } A client with the authorization to view user ID 1000 will receive a response like this (provided that the ID exists): Python { "data": { "user": [ { "email": "test@example.com", "name": "test user", "address": "123 Hollywood Blvd", } ] } } However, a client without the correct authorization will receive a GraphQL response like this: Python { "errors": [ "message": “You are not authorized to view this user.” ] } In each of these three scenarios, the GraphQL server might simply send a 200 OK HTTP code, making monitoring more challenging than with REST APIs. Traditional monitoring tools based on standard HTTP access logs will provide unhelpful log lines, such as these: Python 1.2.3.4 - - [02/Aug/2022:18:27:46 +0000] "POST /graphql HTTP/1.1" 200 - "-" "curl/7.43.0" - 539 1.2.3.4 - - [02/Aug/2022:18:27:46 +0000] "POST /graphql HTTP/1.1" 200 - "-" "curl/7.43.0" - 539 1.2.3.4 - - [02/Aug/2022:18:27:46 +0000] "POST /graphql HTTP/1.1" 200 - "-" "curl/7.43.0" - 539 Proper security monitoring requires full visibility into both query and response payloads and tooling to contextualize threat events. Unfortunately, GraphQL also allows for batching multiple queries in a single HTTP request, offering attackers the ability to hide a number of actions within just one HTTP packet. The following example query would appear as a single HTTP request while making three separate queries: Python query { alias1000: user(id: 1000) { email name address } alias1001: user(id: 1001) { email name address } alias1002: user(id: 1002) { email name address } } It gets even better for attackers eager to conceal their activities: some GraphQL servers support arrays of batched queries, which look like this: Python import requests queries = [ { "query":"query { user(id: 1000) { email name address }", "query":"query { user(id: 1001) { email name address }", "query":"query { user(id: 1002) { email name address }" } ] r = requests.post('http://test.inigo.local/graphql', json=queries) print(r.json()) All this shows that GraphQL simply cannot be effectively secured using incumbent API security strategies alone. Security teams must introduce processes specifically designed to provide a robust observability layer for safeguarding GraphQL APIs and applications and be able to identify threat activity even if it’s hidden within arrays of batched queries. Open-source GraphQL offers tremendous advantages for developers and enterprises but also opens the door to a unique set of security risks. Any organization currently utilizing GraphQL alongside generalized traditional protections should take action now to introduce an effective GraphQL security strategy.
Apache Airflow and Airbyte are complementary tools that can be used together to meet your data integration requirements. Airbyte can be used to extract data from hundreds of sources and load it to any of its supported destinations. Airflow can be used for scheduling and orchestration of tasks, including triggering Airbyte synchronizations. The combination of Airflow and Airbyte provides a flexible, scalable, and maintainable solution for managing your data integration and data processing requirements. In this tutorial, you will install Airbyte Open Source and Apache Airflow running in a local Docker Desktop environment. After installation, you will configure a simple Airbyte connection. Next, you will create an Airflow-directed acyclic graph (DAG), which triggers a data synchronization over the newly created Airbyte connection and then triggers (orchestrates) some additional tasks that depend on the completion of the Airbyte data synchronization. What Is Apache Airflow? Apache Airflow is an open-source orchestrator tool that is used for programmatically scheduling and monitoring workflows. It is often used to manage a sequence of tasks performed by a data integration pipeline. With Airflow, users can define workflows as directed acyclic graphs (DAGs), where each task in the DAG represents an individual operation. Tasks can be executed in parallel or sequentially and can be scheduled to run at specific times or in response to certain events. Airbyte and Apache Airflow Together Airflow can execute tasks that are required as a prerequisite to triggering an Airbyte sync run and/or can be used for scheduling tasks that depend on the completion of an Airbyte sync run. In this tutorial, you will create a simple Airflow DAG that executes tasks on your local environment and that does the following: Triggers an Airbyte ELT pipeline that loads data from the faker source and writes the data to the local filesystem. Waits for the ELT pipeline’s synchronization to complete. Verifies that the expected local file exists. Renames a local file that was created by the synchronization. ℹ️ The purpose of this tutorial is to show how easy it is to set up an Airflow DAG to interact with Airbyte, as well as to give a small taste of the power of Airflow DAGs. This is demonstrated with a simple example, which may be used as a starting point for implementing a more complex real-world use case. Versions There may be future modifications to the API and/or Airflow that could render some of the instructions given in this tutorial obsolete. The instructions presented in this tutorial were created in February 2023, and the following tools were used: Airbyte OSS 0.40.32 Docker Desktop v4.10.1 macOS Monterey Version 12.5.1 MacBook Pro with the Apple M1 Pro Chip Airflow v2.5.1 Git Version: .release:2.5.1+49867b660b6231c1319969217bc61917f7cf9829 Install Airbyte If you already have a local copy of Airbyte running, then you may skip this section. Otherwise, follow the instructions to deploy Airbyte. [Optional] Modify BASIC_AUTH_USERNAME and BASIC_AUTH_PASSWORD in the (hidden) .env file. For this tutorial, I use the following default values: Once Airbyte is running, in your browser, type in localhost:8000, which should prompt you for a username and password as follows: Airbyte OSS login prompt Create a Connection Create a connection that sends data from the Sample Data (Faker) source to the Local JSON (file system) output. Click on “Create your first connection” as shown below: You should then see an option to set up a source connection. Select the Faker source from the dropdown as shown below. After selecting Sample Data as the source, you will see a screen that should look as follows. Click on Set up source as shown below. Configure Sample Data (Faker) as a source You will then wait a few seconds for the Sample Data source to be verified, at which point you will be prompted to configure the destination that will be used for the connection. Select Local JSON as shown below: After selecting Local JSON as the output, you will need to specify where the JSON files should be written. By default, the path that you specify will be located inside /tmp/airbyte_local. In this tutorial, I set the destination to /json_from_faker, which means that the data will be copied to /tmp/airbyte_local/json_from_faker on the localhost where Airbyte is running. After specifying the Destination Path, click on Set up Destination. Configure the Local JSON destination This will take you to a page to set up the connection. Set the replication frequency to Manual (since we will use Airflow to trigger Airbyte syncs rather than using Airbyte’s scheduler) and then click on Set up connection, as highlighted in the image below. Trigger a sync from the Sample Data (faker) source to the Local JSON output by clicking on Sync now, as highlighted in the image below. Manually trigger a sync from the UI The sync should take a few seconds, at which point you should see that the sync has succeeded, as shown below. After the sync has been completed You can now confirm if some sample data has been copied to the expected location. As previously mentioned, for this example, the JSON data can be seen in /tmp/airbyte_local_json_from_faker. Because there were three streams generated, the following three JSON files should be available: You have now created a simple example connection in Airbyte, which can be manually triggered. A manually triggered connection is ideal for situations where you wish to use an external orchestrator. In the next section, you will see how to trigger a manual sync on this connection by hitting a REST endpoint directly. After that, you will see how Airflow can be used to hit that same endpoint to trigger synchronizations. Test the API Endpoints With cURL Before using the REST endpoint from within Airflow, it is useful to verify that it is working as expected. Get the connectionId from the URL shown in your browser as annotated in the following image: Get the Airbyte connection ID You can use cURL to verify that Airbyte’s API endpoint is working as expected. Be sure to update the connectionID in the following command to reflect the value extracted from the URL above. Execute a call to the REST API as follows: The above command should respond with the following, which indicates that a Sync has started: If you look in the UI, you will see that a sync executes each time that you run the cURL command. In my case, I have executed the command twice within a minute of each other, so my UI looks as follows: View the Sync History Install and Launch Airflow Now that you have verified that the REST endpoint is functioning as expected, we’ll start working with Airflow, which will trigger that same Airbyte API endpoint to execute a sync. The instructions for this section are based on Running Airflow in Docker, with additional information about how to get the Airbyte provider installed. Create a new folder that will be used for your Airflow installation and cd into it: Download the Docker-compose file for running Airflow as follows: Then, create additional required sub-directories and assign the Airflow UID as follows: In order to support Airbyte, the Airflow Docker image requires Airbyte provider functionality. The first step is to create a file called Dockerfile in your airflow folder with the following contents: In order to enable the building of a new Airflow image using the Dockerfile that you have just created, you should uncomment the following line in docker-compose.yaml: Additionally, add the following line to docker-compose.yaml in order for Airflow to see and manipulate files in the local folders that Airbyte will write to: The relevant changes to docker-compose.yaml are highlighted in the following image: The relevant portions of the docker-compose.yaml file Build the docker image as follows: This should show that the Airbyte providers specified in the Dockerfile have been installed, as shown in the following image: Confirmation that the Airflow provider has been installed Next, you can initialize Airflow as follows: You are now ready to start Airflow! Execute the following command to launch Airflow and its associated containers: Once the containers are running, you can view the list of the containers with the following command: In my case, there are several containers running for Airbyte and several containers running for Airflow, as shown in the image below: The Docker containers which are running for Airflow and Airbyte Confirm that the Airbyte folders are visible from Airflow by logging into the Airflow scheduler container with the following command: From within that container, you should be able to see an airbyte_local folder in the /tmp directory as follows: Confirmation that the airbyte_local directory is visible Next, you will log in to Airflow by setting your browser to localhost:8080, which should look as follows: Airflow login prompt As documented in Airflow’s REST API instructions, the default username is airflow, and the default password is also airflow. Once you have logged in, you will see a screen that looks as follows: Airflow screen after login Create Airflow Connections Airflow has its own concept of connections, and we will make use of an Airflow connection to trigger a synchronization using an Airbyte connection. In order to demonstrate how Airflow can execute additional tasks that depend on the completion of an Airbyte synchronization, we will also define an Airflow connection that will be used for accessing and modifying files on the local filesystem. To define a connection Airflow will use to communicate with Airbyte, go to admin→connections as shown below: Create an Airflow connection Then click on the + symbol as annotated in the image below: Click on the button to create a new Airflow connection Complete the information about the connection that Airflow will use to connect to Airbyte as follows, and click on the Test button. This should look as follows: Configure an Airflow connection to Airbyte The connection parameters are: Connection Id: Define an identifier that Airflow DAGs can use to communicate with Airbyte. In this example, the identifier is given the name airflow-call-to-airbyte-example, which will be used in the DAG definition (shown later). Connection Type: Specifies that this is a connection to Airbyte. Note that if you do not see Airbyte in the dropdown menu, then the Docker image has not been correctly built. Adding the Airbyte provider to the Docker image was done earlier in this tutorial. Host: The host that is running Airbyte. Note the use of host.docker.internal, which resolves to the internal IP address used by the host, as discussed in Docker’s instructions on network interfaces. Login: The default user to connect to Airbyte is airbyte. If you have changed this, then use whichever username you have defined. Password: If you are using the default, then the value is password. If you have changed this, then use whichever password you have defined. Port: By default, Airbyte listens on port 8000. Click on Save, which should take you back to the Connections screen. Because the DAG that we will define is also going to manipulate files, we will also create a File connection. Again, click on the + symbol as shown below: Create another Airflow connection This should take you to a screen that looks like the following: Create an Airflow connection to manipulate files on the local filesystem The connection parameters are: Connection Id: As mentioned above, this will be used in the DAG to connect to the file system. In this example, the value is set to airflow-file-connector. Connection Type: Select File (path). This connector will be used in the DAG to interact with files on the local filesystem. After saving the above connection, your Connections screen should look as follows: The Airflow connections that have just been created Now that the relevant Airflow connections are defined, they can be used in an Airflow DAG. Create an Airflow DAG In this section, I present Python code for a simple DAG that performs the following tasks: trigger_airbyte: Uses AirbyteTriggerSyncOperator to asynchronously trigger Airbyte to perform a synchronization from the Sample Data (Faker) input to the Local JSON (file) output using the Airbyte connection that we defined above. Because this is executed asynchronously, it immediately returns along with a job id that is used for determining the completion of the synchronization. wait_for_sync_completion: Uses AirbyteJobSensor to wait for Airbyte to complete the synchronization. raw_products_file_sensor: Uses FileSensor to confirm that the file created by Airbyte exists. One of the files created by the Sample Data (Faker) source is called _airbyte_raw_products.jsonl, and this task waits for that file to exist. mv_raw_products_file: Uses BashOperator to rename the raw products file. The code which demonstrates these steps is given below. Copy the code into a file called example_dag.py in the airflow/dags directory that you created earlier in this tutorial. Then, set the AIRBYTE_CONNECTION_ID to the value that you extracted from the Airbyte connection URL earlier. Furthermore, the file paths assume that you have specified /json_from_faker in the Airbyte connector that we defined earlier — if this is not your case, then update RAW_PRODUCTS_FILE and COPY_OF_RAW_PRODUCTS in the code to reflect the correct path. In order to see the new DAG, click on DAGs on the top of the screen and then click on the refresh button highlighted below: View a list of the DAGs and click on the refresh button After some time, the DAG which you just added to the DAGs folder will appear. The name that will appear corresponds to the dag_id you specify in the code: This will appear in the list of DAGs as follows: Ensure that the DAG that you have created appears in the list View the New DAG The DAG that is specified by the above code can be viewed in Airflow by clicking on the Graph button that is annotated in the following illustration and looks as follows: View a graph of the tasks in the selected DAG Execute the Airflow DAG Click on the newly created DAG called airbyte_example_airflow_dag highlighted in the image above. This will take you to a screen that gives more information about the DAG. Run the DAG by clicking on the button in the top right corner as annotated in the following image: Trigger the Airflow DAG that executes an Airbyte synchronization After triggering the DAG, you will see a screen similar to the following, which indicates that it is executing: The status of the Airflow DAG Each time the above DAG is executed, you should see an associated Sync in Airbyte’s Sync History UI as follows: The status of the synchronization that has been executed by Airbyte Finally, once the DAG has been completed, you can look in your local file system to see the files that Airbyte created, as well as the file that Airflow renamed from _airbyte_raw_products.jsonl to moved_raw_products.jsonl. Your /tmp/airbyte_local/json_from_faker folder should look as follows: Relevant files that exist on the local filesystem Conclusion This article has shown you how to set up a simple Airbyte connection for which synchronizations are triggered by an Airflow DAG. After the completion of each synchronization, Airflow checks that the expected file exists and then renames that file. While the tasks that are demonstrated in this tutorial are simple, the concepts that they have demonstrated are powerful. You can extend the functionality demonstrated here to build complex Airbyte ELT pipelines that are orchestrated by Airflow!
In a Cloud Pak for Integration (CP4I) environment, the IBM App Connect operator does a good job monitoring the Integration Server pods and will restart any pods where the Integration Server fails. Either in the rare event that it has crashed or because it is no longer responsive to liveness checks. It is worth noting that by far the most common reason for an Integration Server pod to fail in one of these ways is because it has insufficient resources to process its workload however there are some cases where the user may wish to initiate a restart in response to some other external criteria. In this example, we will consider the case where user supplied Java code in a Java Compute Node has caused an OutOfMemory error. In this instance, the error would not ordinarily be fatal to the Integration Server; however, a user may wish to restart the Integration Server on receiving an error of this type. The same approach can be generalized to any error condition which can be detected in a file on the IS container internal file system. We also assume that the target environment is a Cloud Pak for Integration instance; however, the same approach can be applied to any IBM App Connect Enterprise Operator installation including on plain Kubernetes. Indeed, some of the elements we discuss in this article can be further generalized to traditional on-premise environments. Overall Approach The approach we will take here has the following steps: A background script will monitor a file or set of files looking for specific error text. When the text is matched, the script will kill the parent process of the IntegrationServer. The OpenShift system will notice that the pod has failed and start a new instance of this pod. The Monitor Script Let's take a look at the script we will be using: echo "Starting monitor script" nohup tail -n0 -F $1/log/*events* | awk '/OutOfMemoryError/ { system("echo \"Detected Error, restarting pod\"") system("kill 1") }' 2>$1/log/monitor.out 1> $1/log/monitor.err & echo "Script started" This script starts off by writing a message that it is about to start. This is useful as it allows us to also confirm in the IntegrationServer stdout that the monitoring script is actually installed. The next line initiates a background process. The nohup command along with the "&" at the end of this line of the script means that the next set of commands will be run as a background process and will not terminate when the parent shell terminates. This is important, as we need to return control to the Integration Server after the script is launched. It should also be noted that it is necessary to redirect the stdout and stderr of the background process to ensure that the script as a whole continues executing and returns control to the Integration Server. Without the redirection when the script completes, it attempts to close the stdout/stderr stream of the child process, but this can't be done because the command we are running (tail) will never actually complete. Now let's consider what we are actually doing inside the background process. Specifically, this inner portion of the 2nd line: tail -n0 -F $1/log/*events* | awk '/OutOfMemoryError/ { system("echo \"Detected Error, restarting pod\"") system("kill 1") }' In a CP4I environment, a copy of all BIP messages is written to the "Event Log" which is located in /home/aceuser/ace-server/log. These files have names of the form integration_server.test-oom.events.txt.X where X is either a number or empty. Each time the IS restarts it will "roll over" onto a new log file and similarly if one of these log files reaches its size limit a new one will be created. Once the maximum number of log files is reached the IS will start writing to the first log file. Since this means that the log files may not all exist at startup and over time, they may be rolled over we need to use the -F flag for tail in order to ensure that if the filedescriptor changes as a result of the log file rolling over or a new log file being created tail will still pickup changes. The other thing to note about the tail command is that it is set to -n0 so that it does not print any context on initial execution and will instead simply output all new lines added to the monitored file after the initial execution of the tail command. This is important because if the monitored files are either in the work dir (like the log files) or on a persistent volume then they may survive a pod restart. So, in order to prevent the same log lines being re-processed when a pod restarts we need to make sure only new lines are output by tail. The next part of the command is a simple awk script. This script contains a regex that matches the error text we want to use as a trigger condition. In this case, we want to capture any message that contains the string "OutOfMemoryError". If the line output by tail matches, then the awk script will write a message to the stdout of the background process (which will be redirected to the /home/aceuser/ace-server/log/monitor.out file) and then issues a kill command against pid 1. In a CP4I environment, pid 1 will always be the runaceserver process and killing this process will cause the OpenShift platform to recognize that the pod has failed and restart the pod. Deploying the Monitoring Script So now we have our monitoring script, we need to actually configure the Integration Server to execute it during IS startup. To do this, we can deploy it to the CP4I environment using a "generic" configuration. To do this, we first need to place the script in a zip file and then obtain the contents of the zip file as a base64 encoded string. For example: davicrig:run$ zip monitor.zip monitor.sh adding: monitor.sh (deflated 30%) davicrig:run$ base64 -w 0 monitor.zip UEsDBBQAAAAIAHli81aGdruxqwAAAPUAAAAKABwAbW9uaXRvci5zaFVUCQAD1sa3ZNbGt2R1eAsAAQToAwAABOgDAABdj70Og kAQhHueYnIx/hAV8QGo1M5Q2NoQWPUC3JK9RULUd9fgT2E1yWRmv52mKwLKL4xR/FZz0EzUujNqdlZZ4HOxjZrA8aVtoJmtsHA rLHavTlTxOQrpSk59iDuyrsQkSltNT3uqWfqtCEuEG3zvleqpGSBHsyGlXKkAhsQcEPJfcsPF0ZjZr1PaqkL8Mh4TYJ18sJ //ltwq4gR/Lolg/J00LMBwnwoTPAFQSwECHgMUAAAACAB5YvNWhna7sasAAAD1AAAACgAYAAAAAAABAAAA /4EAAAAAbW9uaXRvci5zaFVUBQAD1sa3ZHV4CwABBOgDAAAE6AMAAFBLBQYAAAAAAQABAFAAAADvAAAAAAA= The long base64 encoded string can be copied to the clipboard. Once the contents of the zip file has been copied, we need to create a new configuration from the IBM App Connect operator using the following settings: When this configuration is made available to the Integration Server, it will unzip the provided zip file into the /home/aceuser/generic directory: /home/aceuser/generic sh-4.4$ ls monitor.sh Customizing an Integration Server to Run the Monitoring Script So the next step is to actually instruct the Integration Server to run the script during startup. To do this, we can use the server.conf.yaml "StartupScripts" stanza. StartupScripts: FirstScript: command: /home/aceuser/generic/monitor.sh ${WORK_DIR} directErrorToCommandOutput: false includeCommandOutputInLogs: true stopServerOnError: true There are a few important things to note about the settings here. In the "command" property we list the fully qualified script name, but we also pass in the ${WORK_DIR} token. This is a special token that is dynamically replaced with the Integration Server's work directory at runtime. For a CP4I environment we could have used a fully qualified path however in traditional on premise deployments we need to use this token in preference to the MQSI_WORKPATH environment variable to cope with cases where a stand-alone Integration Server has its work directory located in a different location to MQSI_WORKPATH. It is also important to note that we must have directErrorToCommandOutput set to false. Setting this value to true prevents the script from exiting properly and causes the startup script to hang which means that the Integration Server is never passed back control and never properly starts. So, once we have our configured server.conf.yaml snippet we need to create a configuration to make this available to the IntegrationServer on the CP4I environment. We can do this by creating a Configuration of the "serverconf" type: Here the "Data" field in the form should contain the server.conf.yaml snippet we are changing encoded into base64. Creating a Test Flow We are almost ready to configure the Integration Server itself, but first of all, we need to create an application which will actually simulate our error condition so that we can demonstrate the monitor script in action. To do this, simply create the following flow in the App Connect Enterprise Toolkit: Here we have 2 HTTP Input nodes, one which will simply immediately issue a successful reply and a second which will execute a Java Compute Node. The Java Compute Node will deliberately throw an error in order to simulate a real failure: Java // ---------------------------------------------------------- // Add user code below if(true) { throw new OutOfMemoryError("oh no! out of memory"); } // End of user code // ---------------------------------------------------------- In order to make this available to the Integration Server, we must deploy this to a dashboard server and obtain the barfileURI as shown below: Making the Deployed Configurations Available to an Integration Server Now we are ready to configure an Integration Server to use the newly uploaded BAR file and the 2 custom configurations that we created earlier. This example assumes that we will be modifying an existing Integration Server however it is also possible to create a new Integration Server by following the same process. We make the deployed configurations available to the Integration Server by updating the "spec" section of the Integration Server yaml as shown below: When you hit the save button, the Operator will reconcile the changes and create a new pod with the Configurations deployed. If we examine the logs for the Integration Server, we can verify that the startup script has executed: 2023-07-19 13:48:39.418412: BIP1990I: Integration server 'test-oom' starting initialization; version '12.0.7.0' (64-bit) 2023-07-19 13:48:39.418960: BIP9560I: Script 'FirstScript' is about to run using command '/home/aceuser/generic/monitor.sh /home/aceuser/ace-server'. Starting monitor script Script started 2023-07-19 13:48:39.424804: BIP9565I: Script 'FirstScript' has run successfully. 2023-07-19 13:48:39.518768: BIP9905I: Initializing resource managers. Testing the Solution We can test our script by issuing a curl command against the test flow that we just deployed: sh-4.4$ curl -v http://localhost:7800/throwOOM * Trying ::1... * TCP_NODELAY set * connect to ::1 port 7800 failed: Connection refused * Trying 127.0.0.1... * TCP_NODELAY set * Connected to localhost (127.0.0.1) port 7800 (#0) > GET /throwOOM HTTP/1.1 > Host: localhost:7800 > User-Agent: curl/7.61.1 > Accept: */* > command terminated with non-zero exit code: exit status 137The terminal connection has closed. Here the terminal closes because as soon as the runaceserver process is killed OpenShift will terminate and restart the pod. The pod logs for the pod will show the following: 2023-07-19T14:25:00.122Z Signal received: terminated 2023-07-19T14:25:00.124Z Stopping metrics gathering 2023-07-19T14:25:00.125Z Stopping Integration Server 2023-07-19 14:25:00.127057: BIP1989I: Integration server is terminating due to a shutdown event. If we log back in to the terminal once the pod is restarted we can examine the previous entries in the events file and confirm that a message matching our error filter was received: sh-4.4$ cat integration_server.test-oom.events.txt.1 | grep Out 2023-07-19 14:25:00.114267Z: [Thread 202] (Msg 3/4) BIP4367E: The method 'evaluate' in Java node 'Java Compute' has thrown the following exception: java.lang.OutOfMemoryError: oh no! out of memory. 2023-07-19 14:25:00.114306Z: [Thread 202] (Msg 4/4) BIP4395E: Java exception: 'java.lang.OutOfMemoryError'; thrown from class name: 'ThrowOOM_JavaCompute', method name: 'evaluate', file: 'ThrowOOM_JavaCompute.java', line: '24' Extending to Other Actions Restarting the pod is not the only action we can take based on a trigger from a monitoring script. We can also execute arbitrary App Connect Enterprise Administration REST API commands. The following script used in place of the original example for instance will enable service trace when the trigger condition is met: echo "Starting monitor script" nohup tail -n0 -F $1/log/*events* | awk '/OutOfMemoryError/ { system("echo \"Detected Error, enabling trace\"") system("curl --unix-socket /home/aceuser/ace-server/config/IntegrationServer.uds -X POST http://localhost:7600/apiv2/start-service-trace") }' 2>$1/log/monitor.out 1> $1/log/monitor.err & echo "Script started" Note that here we are using the unix domain socket interface to the web admin API rather than the tcp interface. This is the same interface used by mqsi* commands and relies on operating system user security. It is possible to use the normal admin port running on port 7600 to make a standard REST request, however it is not possible to extract the basic auth credentials from the script itself since the script runs as aceuser and the credentials are owned by the root user (for security reasons). Therefore instead of passing the basic auth credential into the script where they would be available in plaintext to aceuser I have opted instead to use the unix domains socket. If instead of the original monitor.sh we deploy this new update copy, we can see that when the test flow is executed the pod logs will show that trace is enabled: 2023-07-19 21:08:33.037476: BIP2297I: Integration Server service trace has been enabled due to user-initiated action. Similarly, from within the pod we can confirm that trace files have been created: sh-4.4$ find ace-server/ -name *trace* ace-server/config/common/log/integration_server.test-oom.trace.0.txt sh-4.4$ ls -la ace-server/config/common/log/integration_server.test-oom.trace.0.txt -rw-r-----. 1 1000660000 1000660000 21852199 Jul 19 21:13 ace-server/config/common/log/integration_server.test-oom.trace.0.txt Conclusion In this article, we have seen how to deploy a startup script to an Integration Server running in a CP4I environment and use this to monitor the error logs in order to take corrective action to restart the pods or to run arbitrary admin commands using the REST interface. Using a monitor script provides a great deal of flexibility for reacting to specific conditions for both recovery and troubleshooting purposes.
John Vester
Staff Engineer,
Marqeta @JohnJVester
Colin Domoney
Chief Technology Evangelist,
42Crunch
Saurabh Dashora
Founder,
ProgressiveCoder
Cameron HUNT
Integration Architect,
TeamWork France