#! /usr/bin/env python # -*- coding: utf-8 -*- import argparse from datetime import datetime import json total_nn_api_call_count = 0 def parse_args(): parser = argparse.ArgumentParser() parser.add_argument("file", help="Path to the log file to be analyzed") parser.add_argument("-v", "--verbosity", action="count", default=0, help="increase output verbosity") parser.add_argument("-s", "--start-time", default="2000-01-01T00:00:00.000Z", help="Starting timestamp for analyzing activity") parser.add_argument("-e", "--end-time", default="2199-01-01T00:00:00.000Z", help="Ending timestamp for analyzing activity") parser.add_argument("-g", "--granularity", default="DAYS", choices=["MINUTES", "HOURS", "DAYS"], help="Granularity of API activity analysis. Can be MINUTES, HOURS, or DAYS") parser.add_argument("-t", "--time-format", default="%Y-%m-%dT%H:%M:%S.%fZ", help="Format for timestamps in log files") return parser.parse_args() def calc_mean(nums): sum = 0 for n in nums: sum += n return sum / len(nums) def is_json(log_line): try: json_object = json.loads(log_line) except ValueError as e: return False return True class ImmutaLogAnalyzer(object): def __init__(self, options): self.start_time = options["start_time"] self.end_time = options["end_time"] self.granularity = options["granularity"] self.time_format = options["time_format"] def _round_timestamp(self, timestamp): if self.granularity == "MINUTES": return timestamp.replace(microsecond=0, second=0) elif self.granularity == "HOURS": return timestamp.replace(microsecond=0, second=0, minute=0) else: return timestamp.replace(microsecond=0, second=0, minute=0, hour=0) def analyze(self, logs): time_data = {} nn_api_call_count = 0 other_api_call_count = 0 for line in logs: if is_json(line): j = json.loads(line) rounded_time = self._round_timestamp(datetime.strptime(j["timestamp"], self.time_format)) rounded_time_str = rounded_time.strftime(self.time_format) if rounded_time < self.start_time or rounded_time > self.end_time: continue if not time_data.get(rounded_time_str, None): time_data[rounded_time_str] = { "nn_response_times": [], "other_response_times": [] } path = j.get("path", None) if path: if path.startswith("/hdfs"): nn_api_call_count += 1 time_data[rounded_time_str]["nn_response_times"].append(j["responseTime"]) else: other_api_call_count += 1 time_data[rounded_time_str]["other_response_times"].append(j["responseTime"]) ts_list = list(time_data.keys()) ts_list.sort() for k in ts_list: time_data[k]["nn_response_time_mean"] = round(calc_mean(time_data[k]["nn_response_times"]), 3) time_data[k]["nn_response_time_max"] = max(time_data[k]["nn_response_times"]) msg = "{} -- HDFS API Calls: {}, Mean ResponseTime: {} ms, Max ResponseTime: {} ms".format( k, len(time_data[k]["nn_response_times"]), time_data[k]["nn_response_time_mean"], time_data[k]["nn_response_time_max"] ) print(msg) print("HDFS API Calls: {}".format(nn_api_call_count)) print("Other API Calls: {}".format(other_api_call_count)) if __name__ == "__main__": args = parse_args() options = {} options["granularity"] = args.granularity options["time_format"] = args.time_format options["start_time"] = datetime.strptime(args.start_time, options["time_format"]) options["end_time"] = datetime.strptime(args.end_time, options["time_format"]) analyzer = ImmutaLogAnalyzer(options) with open(args.file) as f: analyzer.analyze(f)