Logstash uses filters in the middle of the pipeline between input and output. The filters of Logstash measures manipulate and create events like Apache-Access. Many filter plugins used to manage the events in Logstash. Here, in an example of the Logstash Aggregate Filter, we are filtering the duration every SQL transaction in a database and computing the total time.
Installing the Aggregate Filter Plugin using the Logstash-plugin utility. The Logstash-plugin is a batch file for windows in bin folder in Logstash.
>logstash-plugin install logstash-filter-aggregate
In this configuration, you can see three ‘if’ statements for Initializing, Incrementing, and generating the total duration of transaction, i.e., the sql_duration. The aggregate plugin is used to add the sql_duration, present in every event of the input log.
input { file { path => "C:/tpwork/logstash/bin/log/input.log" } } filter { grok { match => [ "message", "%{LOGLEVEL:loglevel} - %{NOTSPACE:taskid} - %{NOTSPACE:logger} - %{WORD:label}( - %{INT:duration:int})?" ] } if [logger] == "TRANSACTION_START" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] = 0" map_action => "create" } } if [logger] == "SQL" { aggregate { task_id => "%{taskid}" code => "map['sql_duration'] ||= 0 ; map['sql_duration'] += event.get('duration')" } } if [logger] == "TRANSACTION_END" { aggregate { task_id => "%{taskid}" code => "event.set('sql_duration', map['sql_duration'])" end_of_task => true timeout => 120 } } } output { file { path => "C:/tpwork/logstash/bin/log/output.log" } }
We can run Logstash by using the following command.
>logstash –f logstash.conf
The following code block shows the input log data.
INFO - 48566 - TRANSACTION_START - start INFO - 48566 - SQL - transaction1 - 320 INFO - 48566 - SQL - transaction1 - 200 INFO - 48566 - TRANSACTION_END - end
As specified in the configuration file, the last ‘if’ statement where the logger is – TRANSACTION_END, which prints the total transaction time or sql_duration. This has been highlighted in yellow color in the output.log.
{ "path":"C:/tpwork/logstash/bin/log/input.log","@timestamp": "2016-12-22T19:04:37.214Z", "loglevel":"INFO","logger":"TRANSACTION_START","@version": "1","host":"wcnlab-PC", "message":"8566 - TRANSACTION_START - start\r","tags":[] } { "duration":320,"path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2016-12-22T19:04:38.366Z","loglevel":"INFO","logger":"SQL", "@version":"1","host":"wcnlab-PC","label":"transaction1", "message":" INFO - 48566 - SQL - transaction1 - 320\r","taskid":"48566","tags":[] } { "duration":200,"path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2016-12-22T19:04:38.373Z","loglevel":"INFO","logger":"SQL", "@version":"1","host":"wcnlab-PC","label":"transaction1", "message":" INFO - 48566 - SQL - transaction1 - 200\r","taskid":"48566","tags":[] } { "sql_duration":520,"path":"C:/tpwork/logstash/bin/log/input.log", "@timestamp":"2016-12-22T19:04:38.380Z","loglevel":"INFO","logger":"TRANSACTION_END", "@version":"1","host":"wcnlab-PC","label":"end", "message":" INFO - 48566 - TRANSACTION_END - end\r","taskid":"48566","tags":[] }