How to remove all fields with NULL value in Logstash filter

15,033

Solution 1

Ruby filter can meet your requirement.

input {
        stdin {
        }
}

filter {
        csv {
                columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
                separator => "|"
        }
        ruby {
                code => "
                        hash = event.to_hash
                        hash.each do |k,v|
                                if v == nil
                                        event.remove(k)
                                end
                        end
                "
        }
}

output {
    stdout { codec => rubydebug }
}

You can use ruby plugin to filter all the field with nil value (null in Ruby)

Updated:

This is my environment: Windows server 2008 and Logstash 1.4.1. Your logs sample is work at me! I have updated the configuration, input and output.

Input

2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||

Output:

{
        "@version" => "1",
      "@timestamp" => "2015-03-12T00:30:34.123Z",
            "host" => "BENLIM",
             "num" => "2",
            "date" => "8Jun2012",
            "time" => "16:52:39",
            "orig" => "10.0.0.1",
            "type" => "log",
          "action" => "keyinst",
        "i/f_name" => "daemon",
         "i/f_dir" => "inbound",
         "product" => "VPN-1 & FireWall-1",
    "Internal_CA:" => "Certificate initialized",
     "serial_num:" => "86232",
             "dn:" => "CN=fw-KO,O=sc-KO.KO.dc.obn8cx"
}

Solution 2

If you need to remove all null, blank, and empty fields recursively (0 and false remain), this function might be able to help. It uses the Ruby filter in Logstash. It's by no means elegant, but seems to work pretty effectively.

filter {
    ruby {
        init => "
            def Compact(key)
                modifiedKey = nil
                parentKey = nil

                if key.kind_of?(String)
                    if key.start_with?('[')
                        modifiedKey = key
                    else
                        modifiedKey = key.sub( /([^\[^\]]*)/, '[\1]')
                    end

                parentKey = modifiedKey.sub(/\[[^\[]+\]$/, '') unless modifiedKey.sub(/\[[^\[]+\]$/, '').strip.empty?
                end

                unless modifiedKey.nil?
                    if event.get(modifiedKey).is_a?(Enumerable) &&
                    (event.get(modifiedKey).nil? || event.get(modifiedKey).empty?)
                         event.remove(modifiedKey)
                    elsif event.get(modifiedKey).to_s.strip.empty? || event.get(modifiedKey).nil?
                         event.remove(modifiedKey)
                     end

                    if !parentKey.nil? && event.get(parentKey).is_a?(Enumerable) &&
                    (event.get(parentKey).nil? || event.get(parentKey).empty?)
                        event.remove(parentKey)
                    end
                end

               if key == event.to_hash ||
               event.get((modifiedKey ? modifiedKey : '')).is_a?(Enumerable)
                   key = event.get(modifiedKey) unless modifiedKey.nil?
                   key.each{ |k|
                      Compact(%{#{modifiedKey ? modifiedKey : ''}[#{k.first}]}) if k.is_a?(Enumerable)
                   }
               end

               rescue Exception => e
                   puts %{ruby_exception_#{__method__.to_s} - #{e}}
           end
      "

     code => "
         Compact(event.to_hash)
     "
    }
}

Solution 3

ruby {
            init => "
                def removeEmptyField(event,h,name)
                    h.each do |k,v|
                            if (v.is_a?(Hash) || v.is_a?(Array)) && v.to_s != '{}'
                                removeEmptyField(event,v,String.new(name.to_s) << '[' << k.to_s << ']')
                            else
                            if v == '' || v.to_s == '{}'
                                event.remove(String.new(name.to_s) << '[' << k.to_s << ']')
                            end
                        end
                    end
                end
            "
            code => "
                removeEmptyField event,event.to_hash,''
            "
    }

Solution 4

Check the skip_empty_columns option of the csv filter - was really helpful in my use case. :)

Usage:

skip_empty_columns => true
Share:
15,033

Related videos on Youtube

tomer
Author by

tomer

Updated on June 04, 2022

Comments

  • tomer
    tomer almost 2 years

    I am reading checkpoint log file with csv format with logstash and some fields have null value.

    i want to remove all fields with null value.

    i can not foresee exactly which fields(keys) will have null value because i have 150 columns in the csv file and i dont want check each one of them.

    is it possible to do a dynamic filter in logstash that will remove any fields with null value?

    my logstash configuration file look like that:

    input {
      stdin { tags => "checkpoint" } 
       file {
       type => "file-input"
       path =>  "D:\Browser Downloads\logstash\logstash-1.4.2\bin\checkpoint.csv"
       sincedb_path => "D:\Browser Downloads\logstash\logstash-1.4.2\bin\sincedb-access2"
       start_position => "beginning"
       tags => ["checkpoint","offline"]
      }
    }
    filter {
     if "checkpoint" in [tags] {
            csv {
            columns => ["num","date","time","orig","type","action","alert","i/f_name","i/f_dir","product","Internal_CA:","serial_num:","dn:","sys_message:","inzone","outzone","rule","rule_uid","rule_name","service_id","src","dst","proto","service","s_port","dynamic object","change type","message_info","StormAgentName","StormAgentAction","TCP packet out of state","tcp_flags","xlatesrc","xlatedst","NAT_rulenum","NAT_addtnl_rulenum","xlatedport","xlatesport","fw_message","ICMP","ICMP Type","ICMP Code","DCE-RPC Interface UUID","rpc_prog","log_sys_message","scheme:","Validation log:","Reason:","Serial num:","Instruction:","fw_subproduct","vpn_feature_name","srckeyid","dstkeyid","user","methods:","peer gateway","IKE:","CookieI","CookieR","msgid","IKE notification:","Certificate DN:","IKE IDs:","partner","community","Session:","L2TP:","PPP:","MAC:","OM:","om_method:","assigned_IP:","machine:","reject_category","message:","VPN internal source IP","start_time","connection_uid","encryption failure:","vpn_user","Log ID","message","old IP","old port","new IP","new port","elapsed","connectivity_state","ctrl_category","description","description ","severity","auth_status","identity_src","snid","src_user_name","endpoint_ip","src_machine_name","src_user_group","src_machine_group","auth_method","identity_type","Authentication trial","roles","dst_user_name","dst_machine_name","spi","encryption fail reason:","information","error_description","domain_name","termination_reason","duration"]
          #  remove_field => [ any fields with null value] how to do it please 
            separator => "|"
            }
        # drop csv header
            if [num] == "num" and [date] == "date" and [time] == "time" and [orig] == "orig" {
            drop { }
        }
        }
      }
    
    }
    output {
       stdout {
        codec => rubydebug 
      }
       file {
          path => "output.txt"
       }
    

    HERE I ATTACH SOME LOGS EXAMPLE:

    num|date|time|orig|type|action|alert|i/f_name|i/f_dir|product|Internal_CA:|serial_num:|dn:|sys_message:|inzone|outzone|rule|rule_uid|rule_name|service_id|src|dst|proto|service|s_port|dynamic object|change type|message_info|StormAgentName|StormAgentAction|TCP packet out of state|tcp_flags|xlatesrc|xlatedst|NAT_rulenum|NAT_addtnl_rulenum|xlatedport|xlatesport|fw_message|ICMP|ICMP Type|ICMP Code|DCE-RPC Interface UUID|rpc_prog|log_sys_message|scheme:|Validation log:|Reason:|Serial num:|Instruction:|fw_subproduct|vpn_feature_name|srckeyid|dstkeyid|user|methods:|peer gateway|IKE:|CookieI|CookieR|msgid|IKE notification:|Certificate DN:|IKE IDs:|partner|community|Session:|L2TP:|PPP:|MAC:|OM:|om_method:|assigned_IP:|machine:|reject_category|message:|VPN internal source IP|start_time|connection_uid|encryption failure:|vpn_user|Log ID|message|old IP|old port|new IP|new port|elapsed|connectivity_state|ctrl_category|description|description |severity|auth_status|identity_src|snid|src_user_name|endpoint_ip|src_machine_name|src_user_group|src_machine_group|auth_method|identity_type|Authentication trial|roles|dst_user_name|dst_machine_name|spi|encryption fail reason:|information|error_description|domain_name|termination_reason|duration
    0|8Jun2012|16:33:35|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
     1|8Jun2012|16:36:34|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|started|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
     2|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Certificate initialized|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
     3|8Jun2012|16:52:39|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Initiated certificate is now valid|86232|CN=fw-KO,O=sc-KO.KO.dc.obn8cx|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
     4|8Jun2012|16:55:44|10.0.0.1|log|keyinst||daemon|inbound|VPN-1 & FireWall-1|Issued empty CRL 1|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
    20|8Jun2012|16:58:28|10.0.0.1|log|accept||eth1|inbound|VPN-1 & FireWall-1|||||Internal|External|1|{2A42C8CD-148D-4809-A480-3171108AD6C7}||domain-udp|192.168.100.1|198.32.64.12|udp|53|1036|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
    
  • tomer
    tomer about 9 years
    Hello Ben thanks for helping me, i am testing you ruby answer but in the output to file the null fields still exist...
  • tomer
    tomer about 9 years
    Hi Alain thnks for helping me, i am tring to use ruby filter but i am new to ruby.... and the result output to my file still got a null fields, i will be a glad to get code example to my specific problem that i can test
  • Ban-Chuan Lim
    Ban-Chuan Lim about 9 years
    Can I know the field value is "nil" or "null"? If you want to remove the field with value "null", in the ruby code you want to use "if v== 'null '"
  • tomer
    tomer about 9 years
    this not working when your logstash output is file or elasticsearch
  • Ban-Chuan Lim
    Ban-Chuan Lim about 9 years
    But it's worked on me in either stdout or file. Can you provide your log sample?
  • tomer
    tomer about 9 years
    Hi Ben and 10x for helping me. i update my question and provided a log example
  • Ban-Chuan Lim
    Ban-Chuan Lim about 9 years
    I have update the answer. Your logs is worked at me. Maybe you have try to figure out other problem. You can use my config and test again.
  • tomer
    tomer about 9 years
    can you try add file to ouput section. please tell me if there is null value in the output file
  • Ban-Chuan Lim
    Ban-Chuan Lim about 9 years
    Yes! I also use the file output. The null value field will be eliminated
  • tomer
    tomer about 9 years
    Ok Ben your answer work great, i had a mistake in my config file because that in my filter section i use remove_field => [ ] on the csv plugin. so its ignore the removed fields from ruby plugin
  • st2rseeker
    st2rseeker about 8 years
    There is also an option in the csv filter named skip_empty_columns (at least there is now) - but the ruby was inventive. :)
  • Yalok Iy
    Yalok Iy almost 5 years
    I can confirm that the code above does what is expected – remove fields with null value. Using logstash 6.8.1. I've enhanced it a bit to remove fields with "-" value as well. Just make the second if line to look like: if v == '' || v.to_s == '{}' || v == '-'
  • Yalok Iy
    Yalok Iy almost 5 years
    Actually, to remove fields with null value, one needs to add || v == nil as well.
  • Eric McLachlan
    Eric McLachlan over 3 years
    Wow! That worked brilliantly! So nice and simple.