input { kafka { zk_connect => "" topic_id => "" group_id => "monasca-log-persister" } } filter { date { match => ["[log][timestamp]", "UNIX"] target => "@timestamp" } date { match => ["creation_time", "UNIX"] target => "creation_time" } grok { match => { "[@timestamp]" => "^(?\d{4}-\d{2}-\d{2})" } } if "dimensions" in [log] { ruby { code => " fieldHash = event['log']['dimensions'] fieldHash.each do |key, value| event[key] = value end " } } mutate { add_field => { message => "%{[log][message]}" log_level => "%{[log][level]}" tenant => "%{[meta][tenantId]}" region => "%{[meta][region]}" } remove_field => ["@version", "host", "type", "tags" ,"_index_date", "meta", "log"] } } output { elasticsearch { index => "%{tenant}-%{index_date}" document_type => "log" hosts => ["127.0.0.1"] flush_size => 500 } }