How to remove all fields with NULL value in Logstash filter
Solution 1
Ruby
filter can meet your requirement.
input {
stdin {
}
}
filter {
csv {
columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
separator => "|"
}
ruby {
code => "
hash = event.to_hash
hash.each do |k,v|
if v == nil
event.remove(k)
end
end
"
}
}
output {
stdout { codec => rubydebug }
}
You can use ruby plugin to filter all the field with nil
value (null in Ruby)
Updated:
This is my environment: Windows server 2008 and Logstash 1.4.1. Your logs sample is work at me! I have updated the configuration, input and output.
Input
2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Output:
{
"@version" => "1",
"@timestamp" => "2015-03-12T00:30:34.123Z",
"host" => "BENLIM",
"num" => "2",
"date" => "8Jun2012",
"time" => "16:52:39",
"orig" => "10.0.0.1",
"type" => "log",
"action" => "keyinst",
"i/f_name" => "daemon",
"i/f_dir" => "inbound",
"product" => "VPN-1 & FireWall-1",
"Internal_CA:" => "Certificate initialized",
"serial_num:" => "86232",
"dn:" => "CN=fw-KO,O=sc-KO.KO.dc.obn8cx"
}
Solution 2
If you need to remove all null, blank, and empty fields recursively (0
and false
remain), this function might be able to help. It uses the Ruby filter in Logstash. It's by no means elegant, but seems to work pretty effectively.
filter {
ruby {
init => "
def Compact(key)
modifiedKey = nil
parentKey = nil
if key.kind_of?(String)
if key.start_with?('[')
modifiedKey = key
else
modifiedKey = key.sub( /([^\[^\]]*)/, '[\1]')
end
parentKey = modifiedKey.sub(/\[[^\[]+\]$/, '') unless modifiedKey.sub(/\[[^\[]+\]$/, '').strip.empty?
end
unless modifiedKey.nil?
if event.get(modifiedKey).is_a?(Enumerable) &&
(event.get(modifiedKey).nil? || event.get(modifiedKey).empty?)
event.remove(modifiedKey)
elsif event.get(modifiedKey).to_s.strip.empty? || event.get(modifiedKey).nil?
event.remove(modifiedKey)
end
if !parentKey.nil? && event.get(parentKey).is_a?(Enumerable) &&
(event.get(parentKey).nil? || event.get(parentKey).empty?)
event.remove(parentKey)
end
end
if key == event.to_hash ||
event.get((modifiedKey ? modifiedKey : '')).is_a?(Enumerable)
key = event.get(modifiedKey) unless modifiedKey.nil?
key.each{ |k|
Compact(%{#{modifiedKey ? modifiedKey : ''}[#{k.first}]}) if k.is_a?(Enumerable)
}
end
rescue Exception => e
puts %{ruby_exception_#{__method__.to_s} - #{e}}
end
"
code => "
Compact(event.to_hash)
"
}
}
Solution 3
ruby {
init => "
def removeEmptyField(event,h,name)
h.each do |k,v|
if (v.is_a?(Hash) || v.is_a?(Array)) && v.to_s != '{}'
removeEmptyField(event,v,String.new(name.to_s) << '[' << k.to_s << ']')
else
if v == '' || v.to_s == '{}'
event.remove(String.new(name.to_s) << '[' << k.to_s << ']')
end
end
end
end
"
code => "
removeEmptyField event,event.to_hash,''
"
}
Solution 4
Check the skip_empty_columns
option of the csv
filter - was really helpful in my use case. :)
Usage:
skip_empty_columns => true
Related videos on Youtube
tomer
Updated on June 04, 2022Comments
-
tomer almost 2 years
I am reading checkpoint log file with csv format with logstash and some fields have null value.
i want to remove all fields with null value.
i can not foresee exactly which fields(keys) will have null value because i have 150 columns in the csv file and i dont want check each one of them.
is it possible to do a dynamic filter in logstash that will remove any fields with null value?
my logstash configuration file look like that:
input { stdin { tags => "checkpoint" } file { type => "file-input" path => "D:\Browser Downloads\logstash\logstash-1.4.2\bin\checkpoint.csv" sincedb_path => "D:\Browser Downloads\logstash\logstash-1.4.2\bin\sincedb-access2" start_position => "beginning" tags => ["checkpoint","offline"] } } filter { if "checkpoint" in [tags] { csv { columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"] # remove_field => [ any fields with null value] how to do it please separator => "|" } # drop csv header if [num] == "num" and [date] == "date" and [time] == "time" and [orig] == "orig" { drop { } } } } } output { stdout { codec => rubydebug } file { path => "output.txt" }
HERE I ATTACH SOME LOGS EXAMPLE:
num|date|time|orig|type|action|alert|i/f_name|i/f_dir|product|Internal_CA:|serial_num:|dn:|sys_message:|inzone|outzone|rule|rule_uid|rule_name|service_id|src|dst|proto|service|s_port|dynamic object|change type|message_info|StormAgentName|StormAgentAction|TCP packet out of state|tcp_flags|xlatesrc|xlatedst|NAT_rulenum|NAT_addtnl_rulenum|xlatedport|xlatesport|fw_message|ICMP|ICMP Type|ICMP Code|DCE-RPC Interface UUID|rpc_prog|log_sys_message|scheme:|Validation log:|Reason:|Serial num:|Instruction:|fw_subproduct|vpn_feature_name|srckeyid|dstkeyid|user|methods:|peer gateway|IKE:|CookieI|CookieR|msgid|IKE notification:|Certificate DN:|IKE IDs:|partner|community|Session:|L2TP:|PPP:|MAC:|OM:|om_method:|assigned_IP:|machine:|reject_category|message:|VPN internal source IP|start_time|connection_uid|encryption failure:|vpn_user|Log ID|message|old IP|old port|new IP|new port|elapsed|connectivity_state|ctrl_category|description|description |severity|auth_status|identity_src|snid|src_user_name|endpoint_ip|src_machine_name|src_user_group|src_machine_group|auth_method|identity_type|Authentication trial|roles|dst_user_name|dst_machine_name|spi|encryption fail reason:|information|error_description|domain_name|termination_reason|duration 0|8Jun2012|16:33:35|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 1|8Jun2012|16:36:34|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 3|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Initiated certificate is now valid|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 4|8Jun2012|16:55:44|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Issued empty CRL 1||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||| 20|8Jun2012|16:58:28|10.0.0.1|log|accept||eth1|inbound|VPN-1 & FireWall-1|||||Internal|External|1|{2A42C8CD-148D-4809-A480-3171108AD6C7}||domain-udp|192.168.100.1|198.32.64.12|udp|53|1036|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
-
tomer about 9 yearsHello Ben thanks for helping me, i am testing you ruby answer but in the output to file the null fields still exist...
-
tomer about 9 yearsHi Alain thnks for helping me, i am tring to use ruby filter but i am new to ruby.... and the result output to my file still got a null fields, i will be a glad to get code example to my specific problem that i can test
-
Ban-Chuan Lim about 9 yearsCan I know the field value is "nil" or "null"? If you want to remove the field with value "null", in the ruby code you want to use "if v== 'null '"
-
tomer about 9 yearsthis not working when your logstash output is file or elasticsearch
-
Ban-Chuan Lim about 9 yearsBut it's worked on me in either stdout or file. Can you provide your log sample?
-
tomer about 9 yearsHi Ben and 10x for helping me. i update my question and provided a log example
-
Ban-Chuan Lim about 9 yearsI have update the answer. Your logs is worked at me. Maybe you have try to figure out other problem. You can use my config and test again.
-
tomer about 9 yearscan you try add file to ouput section. please tell me if there is null value in the output file
-
Ban-Chuan Lim about 9 yearsYes! I also use the file output. The null value field will be eliminated
-
tomer about 9 yearsOk Ben your answer work great, i had a mistake in my config file because that in my filter section i use remove_field => [ ] on the csv plugin. so its ignore the removed fields from ruby plugin
-
st2rseeker about 8 yearsThere is also an option in the csv filter named skip_empty_columns (at least there is now) - but the ruby was inventive. :)
-
Yalok Iy almost 5 yearsI can confirm that the code above does what is expected – remove fields with null value. Using logstash 6.8.1. I've enhanced it a bit to remove fields with "-" value as well. Just make the second
if
line to look like:if v == '' || v.to_s == '{}' || v == '-'
-
Yalok Iy almost 5 yearsActually, to remove fields with
null
value, one needs to add|| v == nil
as well. -
Eric McLachlan over 3 yearsWow! That worked brilliantly! So nice and simple.