A core aspect scenario for companies is to start building their business functionalities around one or more databases, and then start connecting services to those databases to perform searches on data, such as searching for product names and brand names in the catalog of a supermarket.
When we use DBs and Elasticsearch together, we are confronted with two types of problems:
- Synchronization timeliness: How long does it take to synchronize database data with Elasticsearch? What is the maximum wait time acceptable to an application system? Generally, we must keep the synchronization time under one second. If it takes more than one minute, the process is considered offline synchronization.
- Data consistency: When DB data is frequently changed and modified, how can we ensure data consistency between Elasticsearch and the DBs? Within an allowed time range, data obtained by the application system through a query is valid and acceptable. When the DB data is overwritten due to a change, the obtained data becomes invalid and unacceptable to the application system.
Benefits from using Logstash
The problem we are trying to solve here — sending data periodically from DBs to Elasticsearch for syncing the two — can be solved with a Shell or Python script ran by a Cron job or any job scheduler, BUT this would deprive us of the benefits otherwise acquired by configuring Logstash and its plugin-based setup:
- Although we focus on MySQL here, Logstash can ingest data from many sources, making it a centralized data input node.
- Logstash offers the possibility to parse, transform, and filter data on the fly, as it passes from source to destination.
- As for the input sources, there are many output destinations available — Elasticsearch being the go-to output.
- As part of the ELK Stack, everything will come together nice and smoothly later with Elasticsearch and Kibana for metrics and visualization.
Use Cases
In this article, we’ll focus on Logstash from the ELK stack in order to periodically fetch data from MySQL, and mirror it on Elasticsearch. This will take into consideration any changes on the MySQL data records, such as create
, update
, and delete
, and have the same replicated on Elasticsearch documents.
We will cover two scenarios in the following steps:
- Creating an Elasticsearch index and indexing database records
- Automatically update the Elasticsearch index based on changes occurring on the database records (creation, update, deletion).
Implementing the concept
Imagine you are opening an online Supermarket. We will create an initial database table products
with a few thousands of records with product name, category, brand name, price, and expiration date. This initial table will serve as a use case (1) Building an index.
We will create triggers on the table products
that will populate a journal table products_journal
with all changes on the products
table (e.g. create
, update
, delete
). This means that whenever a record on the products
table is created, updated, or deleted, this action will be recorded on the products_journal
table, and the same action will be done on the corresponding Elasticsearch document. This will serve as the use case (2) updating the Elasticsearch index.
Prerequisites
Steps
1. Setup a MySQL database: Create a directory data/
where we’ll store MySQL dump files with the pre-cleaned products data for the products
table, as well as the triggers for the products
table (on create, update, and delete), and a new_arrivals
table with products we will add to our catalog to simulate new records, and the table products_journal
.
version: "3"
services:
# add this:
mysql:
image: mysql:8
container_name: sem_mysql
ports:
- 3306:3306
environment:
MYSQL_RANDOM_ROOT_PASSWORD: "yes"
MYSQL_DATABASE: products
MYSQL_USER: reader
MYSQL_PASSWORD: password_123
volumes:
# Dump files for initiating tables
- ./data/:/docker-entrypoint-initdb.d/
2. Setup Elasticsearch and Kibana: To setup Elasticsearch (without indexing any document yet), add this to your docker-compose.yaml file:
version: "3"
services:
...
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:7.9.3
container_name: sem_elasticsearch
environment:
- discovery.type=single-node
- bootstrap.memory_lock=true
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- ./volumes/elasticsearch:/usr/share/elasticsearch/data
logging:
driver: "json-file"
options:
max-size: "10k"
max-file: "10"
kibana:
image: docker.elastic.co/kibana/kibana:7.9.3
container_name: sem_kibana
environment:
- "ELASTICSEARCH_URL=http://elasticsearch:9200"
- "SERVER_NAME=127.0.0.1"
ports:
- 5601:5601
depends_on:
- elasticsearch
Note that the volumes definition is recommended in order to mount the Elasticsearch index data from the docker volume to your directory file system.
After executing the above commands, 3 containers should be up and running.
3. Setup Logstash to pipe data from MySQL to Elasticsearch:
To connect Logstash to MySQL, we will use the official JDBC driver available at this address.
Let’s create a Dockerfile (named Dockerfile-logstash
in the same directory) to pull a Logstash image, download the JDBC connector, and start a Logstash container. Add these lines to your Dockerfile:
FROM docker.elastic.co/logstash/logstash:7.9.3# Download JDBC connector for Logstash
RUN curl -L --output "mysql-connector-java-8.0.22.tar.gz" "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-8.0.22.tar.gz" \
&& tar -xf "mysql-connector-java-8.0.22.tar.gz" "mysql-connector-java-8.0.22/mysql-connector-java-8.0.22.jar" \
&& mv "mysql-connector-java-8.0.22/mysql-connector-java-8.0.22.jar" "mysql-connector-java-8.0.22.jar" \
&& rm -r "mysql-connector-java-8.0.22" "mysql-connector-java-8.0.22.tar.gz"ENTRYPOINT ["/usr/local/bin/docker-entrypoint"]
Then add the following snippet to your docker-compose.yaml file:
version: "3"
services:
...
logstash:
build:
context: .
dockerfile: Dockerfile-logstash
container_name: sem_logstash
depends_on:
- mysql
- elasticsearch
volumes:
# We will explain why and how to add volumes below
Logstash uses defined pipelines to know where to get data from, how to filter it, and where should it go. We will define two pipelines: one for creating an Elasticsearch index from scratch (first scenario), and one for automatically updates the changes to the database records (second scenario).
Please check the documentation for an explanation of each of the fields used in the pipeline definition:
- Input: JDBC Input Plugin
- Filter: Mutate
remove_field
- Output: Elasticsearch Output Plugin
4.a. First Scenario — Creating an Elasticsearch index for products table:
create a file pipelines.yml
containing:
- pipeline.id: from-scratch-pipeline
path.config: "/usr/share/logstash/pipeline/product_index.conf"
and there create a file product_index.conf
in the directory logstash/pipeline/
with the following content:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306"
jdbc_user => "reader"
jdbc_password => "password_123"
clean_run => true
record_last_run => false
statement => "SELECT * FROM products.products"
}
}filter {
mutate {
remove_field => ["@version", "@timestamp"]
}
}output {
# stdout { codec => rubydebug { metadata => true } }
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "products"
action => "index"
document_id => "%{isbn}"
}
}
We define where to find the JDBC connector in jdbc_driver_library
, setup where to find MySQL in jdbc_connection_string
, instruct the plugin to run from scratch in clean_run
, and we define where to find the SQL statement to fetch and format the data records in statement_filepath
.
The filter basically removes extra fields added by the plugin.
In the output, we define where to find the Elasticsearch host, set the name of the index to products
(can be a new or an existing index), define which action to perform (can be index
, create
, update
and delete
— see docs), and set up which field will serve as a unique ID in the products
index.
You can also use the SQL query referenced statement_filepath
instead statement
of by adding a file product.sql
with as little as:
SELECT * FROM products.products
To get our index built with all the records available so far in our
products
table. Note that the query should NOT end with a semi-colon (;).
If the run is successful, you will see a message similar to Logstash shut down
and the container will exit with error code 0.
Now head to Kibana on your browser (to this link for example) and let’s start checking if we have a products
index and products data.
4.b. Second Scenario — automatically updating an Elasticsearch index for products table:
Most of the configuring and tweaking has been done in the previous part. We will simply add another pipeline that will automatically take charge of the update (replication).
In the file pipelines.yml
, add these two line:
- pipeline.id: automatic-pipeline
path.config: "/usr/share/logstash/pipeline/automatic.conf"
In the same file, you may want to comment out the two previous lines related to the “product_index” part (first scenario) in order to instruct Logstash to run this automatic pipeline only.
Let’s create a file automatic.conf
in the directory logstash/pipeline/
with the following content:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/mysql-connector-java-8.0.22.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://mysql:3306"
jdbc_user => "reader"
jdbc_password => "password_123" statement => "
SELECT
j.journal_id, j.action_type, j.isbn,
b.name, b.category, b.price
FROM products.products_journal j
LEFT JOIN products.products b ON b.isbn = j.isbn
WHERE j.journal_id > :sql_last_value
AND j.action_time < NOW()
ORDER BY j.journal_id" use_column_value => true
tracking_column => "journal_id"
tracking_column_type => "numeric"
schedule => "*/5 * * * * *"
}
}filter {
if [action_type] == "create" or [action_type] == "update" {
mutate { add_field => { "[@metadata][action]" => "index" } }
} else if [action_type] == "delete" {
mutate { add_field => { "[@metadata][action]" => "delete" } }
}mutate {
remove_field => ["@version", "@timestamp", "action_type"]
}
}output {
# stdout { codec => rubydebug { metadata => true } }
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "products"
action => "%{[@metadata][action]}"
document_id => "%{isbn}"
}
}
We see a few extra parameters compared to the previous pipeline definition.
The schedule
parameter has a cron-like syntax. Here we instruct Logstash to run this pipeline every 5 seconds with */5 * * * * *
.
The website crontab.guru can help you read crontab expressions.
During each periodic run, the parameter tracking_column
instructs Logstash to store the journal_id
value of the last record fetched and store it somewhere on the filesystem (See documentation here for last_run_metadata_path
). In the following run, Logstash will fetch records starting from journal_id + 1
, where journal_id
is the value stored in the current run.
In the filter
section, when the database action type is “create” or “update”, we set the Elasticsearch action to “index” so that new documents get indexed, and existing documents get re-indexed to update their value. Another approach to avoid re-indexing existing documents is to write a custom “update” script and to use the “upsert” action. For “delete” actions, the document on Elasticsearch gets deleted.
Now create new products and head to Kibana again and check if we have the product onproducts
index.