input { kafka { ## zookeeper instances zk_connect => " ## must be equal to the topic name which monasca-log-transformer uses to send logs topic_id => "" group_id => "monasca_log_metrics" consumer_id => "monasca_log_metrics" consumer_threads => "1" } } filter { # drop logs that have not set log level if "level" not in [log] { drop { periodic_flush => true } } else { ruby { code => " log_level = event['log']['level'].downcase event['log']['level'] = log_level " } } # drop logs with log level not in warning,error if [log][level] not in [warning,error] { drop { periodic_flush => true } } ruby { code => " log_level = event['log']['level'].downcase log_ts = Time.now.to_f * 1000.0 # metric name metric_name = 'log.%s' % log_level # build metric metric = {} metric['name'] = metric_name metric['timestamp'] = log_ts metric['value'] = 1 metric['dimensions'] = event['log']['dimensions'] metric['value_meta'] = {} # finish it off event['metric'] = metric.to_hash " } mutate { remove_field => ["log", "@version", "@timestamp", "log_level_original", "tags"] } } output { kafka { ## location of kafka brokers bootstrap_servers => "" ## must be equal to the topic name where metrics are sent topic_id => "" client_id => "monasca_log_metrics" compression_type => "none" } }