Logstash for Synchronize Elasticsearch with DBs

Maha Amer
8 min readApr 17, 2021

A core aspect scenario for companies is to start building their business functionalities around one or more databases, and then start connecting services to those databases to perform searches on data, such as searching for product names and brand names in the catalog of a supermarket.

When we use DBs and Elasticsearch together, we are confronted with two types of problems:

  1. Synchronization timeliness: How long does it take to synchronize database data with Elasticsearch? What is the maximum wait time acceptable to an application system? Generally, we must keep the synchronization time under one second. If it takes more than one minute, the process is considered offline synchronization.
  2. Data consistency: When DB data is frequently changed and modified, how can we ensure data consistency between Elasticsearch and the DBs? Within an allowed time range, data obtained by the application system through a query is valid and acceptable. When the DB data is overwritten due to a change, the obtained data becomes invalid and unacceptable to the application system.

Benefits from using Logstash

The problem we are trying to solve here — sending data periodically from DBs to Elasticsearch for syncing the two — can be solved with a Shell or Python script ran by a Cron job or any job scheduler, BUT this would deprive us of the benefits otherwise acquired by configuring Logstash and its plugin-based setup:

Use Cases

In this article, we’ll focus on Logstash from the ELK stack in order to periodically fetch data from MySQL, and mirror it on Elasticsearch. This will take into consideration any changes on the MySQL data records, such as create , update , and delete , and have the same replicated on Elasticsearch documents.

We will cover two scenarios in the following steps:

  1. Creating an Elasticsearch index and indexing database records
  2. Automatically update the Elasticsearch index based on changes occurring on the database records (creation, update, deletion).

Implementing the concept

Imagine you are opening an online Supermarket. We will create an initial database table products with a few thousands of records with product name, category, brand name, price, and expiration date. This initial table will serve as a use case (1) Building an index.

We will create triggers on the table products that will populate a journal table products_journal with all changes on the products table (e.g. create , update , delete ). This means that whenever a record on the products table is created, updated, or deleted, this action will be recorded on the products_journal table, and the same action will be done on the corresponding Elasticsearch document. This will serve as the use case (2) updating the Elasticsearch index.

Prerequisites

Steps

1. Setup a MySQL database: Create a directory data/ where we’ll store MySQL dump files with the pre-cleaned products data for the products table, as well as the triggers for the products table (on create, update, and delete), and a new_arrivals table with products we will add to our catalog to simulate new records, and the table products_journal .

version: "3"
services:
# add this:
mysql:
image: mysql:8
container_name: sem_mysql
ports:
- 3306:3306
environment:
MYSQL_RANDOM_ROOT_PASSWORD: "yes"
MYSQL_DATABASE: products
MYSQL_USER: reader
MYSQL_PASSWORD: password_123
volumes:
# Dump files for initiating tables
- ./data/:/docker-entrypoint-initdb.d/

2. Setup Elasticsearch and Kibana: To setup Elasticsearch (without indexing any document yet), add this to your docker-compose.yaml file:

version: "3"
services:
...
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.9.3
container_name: sem_elasticsearch
environment:
- discovery.type=single-node
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- ./volumes/elasticsearch:/usr/share/elasticsearch/data
logging:
driver: "json-file"
options:
max-size: "10k"
max-file: "10"
kibana:
image: docker.elastic.co/kibana/kibana:7.9.3
container_name: sem_kibana
environment:
- "ELASTICSEARCH_URL=http://elasticsearch:9200"
- "SERVER_NAME=127.0.0.1"
ports:
- 5601:5601
depends_on:
- elasticsearch

Note that the volumes definition is recommended in order to mount the Elasticsearch index data from the docker volume to your directory file system.

After executing the above commands, 3 containers should be up and running.

3. Setup Logstash to pipe data from MySQL to Elasticsearch:

To connect Logstash to MySQL, we will use the official JDBC driver available at this address.

Let’s create a Dockerfile (named Dockerfile-logstash in the same directory) to pull a Logstash image, download the JDBC connector, and start a Logstash container. Add these lines to your Dockerfile:

FROM docker.elastic.co/logstash/logstash:7.9.3# Download JDBC connector for Logstash
RUN curl -L --output "mysql-connector-java-8.0.22.tar.gz" "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.22.tar.gz" \
&& tar -xf "mysql-connector-java-8.0.22.tar.gz" "mysql-connector-java-8.0.22/mysql-connector-java-8.0.22.jar" \
&& mv "mysql-connector-java-8.0.22/mysql-connector-java-8.0.22.jar" "mysql-connector-java-8.0.22.jar" \
&& rm -r "mysql-connector-java-8.0.22" "mysql-connector-java-8.0.22.tar.gz"ENTRYPOINT ["/usr/local/bin/docker-entrypoint"]

Then add the following snippet to your docker-compose.yaml file:

version: "3"
services:
...
logstash:
build:
context: .
dockerfile: Dockerfile-logstash
container_name: sem_logstash
depends_on:
- mysql
- elasticsearch
volumes:
# We will explain why and how to add volumes below

Logstash uses defined pipelines to know where to get data from, how to filter it, and where should it go. We will define two pipelines: one for creating an Elasticsearch index from scratch (first scenario), and one for automatically updates the changes to the database records (second scenario).

Please check the documentation for an explanation of each of the fields used in the pipeline definition:

4.a. First Scenario — Creating an Elasticsearch index for products table:

create a file pipelines.yml containing:

- pipeline.id: from-scratch-pipeline
path.config: "/usr/share/logstash/pipeline/product_index.conf"

and there create a file product_index.conf in the directory logstash/pipeline/with the following content:

input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306"
jdbc_user => "reader"
jdbc_password => "password_123"
clean_run => true
record_last_run => false
statement => "SELECT * FROM products.products"
}
}filter {
mutate {
remove_field => ["@version", "@timestamp"]
}
}output {
# stdout { codec => rubydebug { metadata => true } }
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "products"
action => "index"
document_id => "%{isbn}"
}
}

We define where to find the JDBC connector in jdbc_driver_library , setup where to find MySQL in jdbc_connection_string , instruct the plugin to run from scratch in clean_run , and we define where to find the SQL statement to fetch and format the data records in statement_filepath .

The filter basically removes extra fields added by the plugin.

In the output, we define where to find the Elasticsearch host, set the name of the index to products (can be a new or an existing index), define which action to perform (can be index , create , update and delete see docs), and set up which field will serve as a unique ID in the products index.

You can also use the SQL query referenced statement_filepath instead statementof by adding a file product.sql with as little as:

SELECT * FROM products.products

To get our index built with all the records available so far in our products table. Note that the query should NOT end with a semi-colon (;).

If the run is successful, you will see a message similar to Logstash shut down and the container will exit with error code 0.

Now head to Kibana on your browser (to this link for example) and let’s start checking if we have a products index and products data.

4.b. Second Scenario — automatically updating an Elasticsearch index for products table:

Most of the configuring and tweaking has been done in the previous part. We will simply add another pipeline that will automatically take charge of the update (replication).

In the file pipelines.yml, add these two line:

- pipeline.id: automatic-pipeline
path.config: "/usr/share/logstash/pipeline/automatic.conf"

In the same file, you may want to comment out the two previous lines related to the “product_index” part (first scenario) in order to instruct Logstash to run this automatic pipeline only.

Let’s create a file automatic.conf in the directory logstash/pipeline/ with the following content:

input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306"
jdbc_user => "reader"
jdbc_password => "password_123"
statement => "
SELECT
j.journal_id, j.action_type, j.isbn,
b.name, b.category, b.price
FROM products.products_journal j
LEFT JOIN products.products b ON b.isbn = j.isbn
WHERE j.journal_id > :sql_last_value
AND j.action_time < NOW()
ORDER BY j.journal_id"
use_column_value => true
tracking_column => "journal_id"
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
}
}filter {
if [action_type] == "create" or [action_type] == "update" {
mutate { add_field => { "[@metadata][action]" => "index" } }
} else if [action_type] == "delete" {
mutate { add_field => { "[@metadata][action]" => "delete" } }
}mutate {
remove_field => ["@version", "@timestamp", "action_type"]
}
}output {
# stdout { codec => rubydebug { metadata => true } }
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "products"
action => "%{[@metadata][action]}"
document_id => "%{isbn}"
}
}

We see a few extra parameters compared to the previous pipeline definition.

The schedule parameter has a cron-like syntax. Here we instruct Logstash to run this pipeline every 5 seconds with */5 * * * * * .

The website crontab.guru can help you read crontab expressions.

During each periodic run, the parameter tracking_column instructs Logstash to store the journal_id value of the last record fetched and store it somewhere on the filesystem (See documentation here for last_run_metadata_path ). In the following run, Logstash will fetch records starting from journal_id + 1 , where journal_id is the value stored in the current run.

In the filter section, when the database action type is “create” or “update”, we set the Elasticsearch action to “index” so that new documents get indexed, and existing documents get re-indexed to update their value. Another approach to avoid re-indexing existing documents is to write a custom “update” script and to use the “upsert” action. For “delete” actions, the document on Elasticsearch gets deleted.

Now create new products and head to Kibana again and check if we have the product onproducts index.

Resources

--

--

Maha Amer

I am passionate about Cloud Computing development, I've been working as Site Reliability Engineer and architect/design SWVL/Capiter infrastructure system