| ''' |
| Classes related to searching logs |
| ''' |
| |
| __copyright__=''' |
| Copyright (C) 2014 Andrew Beekhof <andrew@beekhof.net> |
| Licensed under the GNU GPL. |
| ''' |
| |
| # |
| # This program is free software; you can redistribute it and/or |
| # modify it under the terms of the GNU General Public License |
| # as published by the Free Software Foundation; either version 2 |
| # of the License, or (at your option) any later version. |
| # |
| # This program is distributed in the hope that it will be useful, |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| # GNU General Public License for more details. |
| # |
| # You should have received a copy of the GNU General Public License |
| # along with this program; if not, write to the Free Software |
| # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. |
| |
| import time, re, os, threading |
| |
| from cts.remote import * |
| from cts.logging import * |
| |
| has_log_watcher = {} |
| log_watcher_file = "cts_log_watcher.py" |
| log_watcher_bin = CTSvars.CRM_DAEMON_DIR + "/" + log_watcher_file |
| log_watcher = """ |
| import sys, os, fcntl |
| |
| ''' |
| Remote logfile reader for CTS |
| Reads a specified number of lines from the supplied offset |
| Returns the current offset |
| |
| Contains logic for handling truncation |
| ''' |
| |
| limit = 0 |
| offset = 0 |
| prefix = '' |
| filename = '/var/log/messages' |
| |
| skipthis=None |
| args=sys.argv[1:] |
| for i in range(0, len(args)): |
| if skipthis: |
| skipthis=None |
| continue |
| |
| elif args[i] == '-l' or args[i] == '--limit': |
| skipthis=1 |
| limit = int(args[i+1]) |
| |
| elif args[i] == '-f' or args[i] == '--filename': |
| skipthis=1 |
| filename = args[i+1] |
| |
| elif args[i] == '-o' or args[i] == '--offset': |
| skipthis=1 |
| offset = args[i+1] |
| |
| elif args[i] == '-p' or args[i] == '--prefix': |
| skipthis=1 |
| prefix = args[i+1] |
| |
| elif args[i] == '-t' or args[i] == '--tag': |
| skipthis=1 |
| |
| if not os.access(filename, os.R_OK): |
| print(prefix + 'Last read: %d, limit=%d, count=%d - unreadable' % (0, limit, 0)) |
| sys.exit(1) |
| |
| logfile=open(filename, 'r') |
| logfile.seek(0, os.SEEK_END) |
| newsize=logfile.tell() |
| |
| if offset != 'EOF': |
| offset = int(offset) |
| if newsize >= offset: |
| logfile.seek(offset) |
| else: |
| print(prefix + ('File truncated from %d to %d' % (offset, newsize))) |
| if (newsize*1.05) < offset: |
| logfile.seek(0) |
| # else: we probably just lost a few logs after a fencing op |
| # continue from the new end |
| # TODO: accept a timestamp and discard all messages older than it |
| |
| # Don't block when we reach EOF |
| fcntl.fcntl(logfile.fileno(), fcntl.F_SETFL, os.O_NONBLOCK) |
| |
| count = 0 |
| while True: |
| if logfile.tell() >= newsize: break |
| elif limit and count >= limit: break |
| |
| line = logfile.readline() |
| if not line: break |
| |
| print(line.strip()) |
| count += 1 |
| |
| print(prefix + 'Last read: %d, limit=%d, count=%d' % (logfile.tell(), limit, count)) |
| logfile.close() |
| """ |
| |
| class SearchObj: |
| def __init__(self, filename, host=None, name=None): |
| |
| self.limit = None |
| self.cache = [] |
| self.logger = LogFactory() |
| self.host = host |
| self.name = name |
| self.filename = filename |
| self.rsh = RemoteFactory().getInstance() |
| |
| self.offset = "EOF" |
| |
| if host == None: |
| host = "localhost" |
| |
| def __str__(self): |
| if self.host: |
| return "%s:%s" % (self.host, self.filename) |
| return self.filename |
| |
| def log(self, args): |
| message = "lw: %s: %s" % (self, args) |
| self.logger.log(message) |
| |
| def debug(self, args): |
| message = "lw: %s: %s" % (self, args) |
| self.logger.debug(message) |
| |
| def harvest(self, delegate=None): |
| async = self.harvest_async(delegate) |
| async.join() |
| |
| def harvest_async(self, delegate=None): |
| self.log("Not implemented") |
| raise |
| |
| def end(self): |
| self.debug("Unsetting the limit") |
| # Unset the limit |
| self.limit = None |
| |
| class FileObj(SearchObj): |
| def __init__(self, filename, host=None, name=None): |
| global has_log_watcher |
| SearchObj.__init__(self, filename, host, name) |
| |
| if host is not None: |
| if not host in has_log_watcher: |
| |
| global log_watcher |
| global log_watcher_bin |
| |
| self.debug("Installing %s on %s" % (log_watcher_file, host)) |
| |
| os.system("cat << END >> %s\n%s\nEND" %(log_watcher_file, log_watcher)) |
| os.system("chmod 755 %s" %(log_watcher_file)) |
| |
| self.rsh.cp(log_watcher_file, "root@%s:%s" % (host, log_watcher_bin)) |
| has_log_watcher[host] = 1 |
| |
| os.system("rm -f %s" %(log_watcher_file)) |
| |
| self.harvest() |
| |
| def async_complete(self, pid, returncode, outLines, errLines): |
| for line in outLines: |
| match = re.search("^CTSwatcher:Last read: (\d+)", line) |
| if match: |
| last_offset = self.offset |
| self.offset = match.group(1) |
| #if last_offset == "EOF": self.debug("Got %d lines, new offset: %s" % (len(outLines), self.offset)) |
| self.debug("Got %d lines, new offset: %s %s" % (len(outLines), self.offset, repr(self.delegate))) |
| |
| elif re.search("^CTSwatcher:.*truncated", line): |
| self.log(line) |
| elif re.search("^CTSwatcher:", line): |
| self.debug("Got control line: "+ line) |
| else: |
| self.cache.append(line) |
| |
| if self.delegate: |
| self.delegate.async_complete(pid, returncode, self.cache, errLines) |
| |
| def harvest_async(self, delegate=None): |
| self.delegate = delegate |
| self.cache = [] |
| |
| if self.limit != None and self.offset > self.limit: |
| if self.delegate: |
| self.delegate.async_complete(-1, -1, [], []) |
| return None |
| |
| global log_watcher_bin |
| return self.rsh.call_async(self.host, |
| "python %s -t %s -p CTSwatcher: -l 200 -f %s -o %s" % (log_watcher_bin, self.name, self.filename, self.offset), |
| completionDelegate=self) |
| |
| def setend(self): |
| if self.limit: |
| return |
| |
| global log_watcher_bin |
| (rc, lines) = self.rsh(self.host, |
| "python %s -t %s -p CTSwatcher: -l 2 -f %s -o %s" % (log_watcher_bin, self.name, self.filename, "EOF"), |
| None, silent=True) |
| |
| for line in lines: |
| match = re.search("^CTSwatcher:Last read: (\d+)", line) |
| if match: |
| last_offset = self.offset |
| self.limit = int(match.group(1)) |
| #if last_offset == "EOF": self.debug("Got %d lines, new offset: %s" % (len(lines), self.offset)) |
| self.debug("Set limit to: %d" % self.limit) |
| |
| return |
| |
| class JournalObj(SearchObj): |
| |
| def __init__(self, host=None, name=None): |
| SearchObj.__init__(self, name, host, name) |
| self.harvest() |
| |
| def async_complete(self, pid, returncode, outLines, errLines): |
| #self.log( "%d returned on %s" % (pid, self.host)) |
| foundCursor = False |
| for line in outLines: |
| match = re.search("^-- cursor: ([^.]+)", line) |
| if match: |
| foundCursor = True |
| last_offset = self.offset |
| self.offset = match.group(1).strip() |
| self.debug("Got %d lines, new cursor: %s" % (len(outLines), self.offset)) |
| else: |
| self.cache.append(line) |
| |
| if self.limit and not foundCursor: |
| self.hitLimit = True |
| self.debug("Got %d lines but no cursor: %s" % (len(outLines), self.offset)) |
| |
| # Get the current cursor |
| (rc, outLines) = self.rsh(self.host, "journalctl -q -n 0 --show-cursor", stdout=None, silent=True, synchronous=True) |
| for line in outLines: |
| match = re.search("^-- cursor: ([^.]+)", line) |
| if match: |
| last_offset = self.offset |
| self.offset = match.group(1).strip() |
| self.debug("Got %d lines, new cursor: %s" % (len(outLines), self.offset)) |
| else: |
| self.log("Not a new cursor: %s" % line) |
| self.cache.append(line) |
| |
| if self.delegate: |
| self.delegate.async_complete(pid, returncode, self.cache, errLines) |
| |
| def harvest_async(self, delegate=None): |
| self.delegate = delegate |
| self.cache = [] |
| |
| # Use --lines to prevent journalctl from overflowing the Popen input buffer |
| if self.limit and self.hitLimit: |
| return None |
| |
| elif self.limit: |
| command = "journalctl -q --after-cursor='%s' --until '%s' --lines=200 --show-cursor" % (self.offset, self.limit) |
| else: |
| command = "journalctl -q --after-cursor='%s' --lines=200 --show-cursor" % (self.offset) |
| |
| if self.offset == "EOF": |
| command = "journalctl -q -n 0 --show-cursor" |
| |
| return self.rsh.call_async(self.host, command, completionDelegate=self) |
| |
| def setend(self): |
| if self.limit: |
| return |
| |
| self.hitLimit = False |
| (rc, lines) = self.rsh(self.host, "date +'%Y-%m-%d %H:%M:%S'", stdout=None, silent=True) |
| |
| if (rc == 0) and (len(lines) == 1): |
| self.limit = lines[0].strip() |
| self.debug("Set limit to: %s" % self.limit) |
| else: |
| self.debug("Unable to set limit for %s because date returned %d lines with status %d" % (self.host, |
| len(lines), rc)) |
| |
| return |
| |
| class LogWatcher(RemoteExec): |
| |
| '''This class watches logs for messages that fit certain regular |
| expressions. Watching logs for events isn't the ideal way |
| to do business, but it's better than nothing :-) |
| |
| On the other hand, this class is really pretty cool ;-) |
| |
| The way you use this class is as follows: |
| Construct a LogWatcher object |
| Call setwatch() when you want to start watching the log |
| Call look() to scan the log looking for the patterns |
| ''' |
| |
| def __init__(self, log, regexes, name="Anon", timeout=10, debug_level=None, silent=False, hosts=None, kind=None): |
| '''This is the constructor for the LogWatcher class. It takes a |
| log name to watch, and a list of regular expressions to watch for." |
| ''' |
| self.logger = LogFactory() |
| |
| self.name = name |
| self.regexes = regexes |
| self.debug_level = debug_level |
| self.whichmatch = -1 |
| self.unmatched = None |
| self.cache_lock = threading.Lock() |
| |
| self.file_list = [] |
| self.line_cache = [] |
| |
| # Validate our arguments. Better sooner than later ;-) |
| for regex in regexes: |
| assert re.compile(regex) |
| |
| if kind: |
| self.kind = kind |
| else: |
| raise |
| #self.kind = self.Env["LogWatcher"] |
| |
| if log: |
| self.filename = log |
| else: |
| raise |
| #self.filename = self.Env["LogFileName"] |
| |
| if hosts: |
| self.hosts = hosts |
| else: |
| raise |
| #self.hosts = self.Env["nodes"] |
| |
| if trace_lw: |
| self.debug_level = 3 |
| silent = False |
| |
| if not silent: |
| for regex in self.regexes: |
| self.debug("Looking for regex: "+regex) |
| |
| self.Timeout = int(timeout) |
| self.returnonlymatch = None |
| |
| def debug(self, args): |
| message = "lw: %s: %s" % (self.name, args) |
| self.logger.debug(message) |
| |
| def setwatch(self): |
| '''Mark the place to start watching the log from. |
| ''' |
| |
| if self.kind == "remote": |
| for node in self.hosts: |
| self.file_list.append(FileObj(self.filename, node, self.name)) |
| |
| elif self.kind == "journal": |
| for node in self.hosts: |
| self.file_list.append(JournalObj(node, self.name)) |
| |
| else: |
| self.file_list.append(FileObj(self.filename)) |
| |
| # print("%s now has %d files" % (self.name, len(self.file_list))) |
| |
| def __del__(self): |
| if self.debug_level > 1: self.debug("Destroy") |
| |
| def ReturnOnlyMatch(self, onlymatch=1): |
| '''Specify one or more subgroups of the match to return rather than the whole string |
| http://www.python.org/doc/2.5.2/lib/match-objects.html |
| ''' |
| self.returnonlymatch = onlymatch |
| |
| def async_complete(self, pid, returncode, outLines, errLines): |
| # TODO: Probably need a lock for updating self.line_cache |
| self.logger.debug("%s: Got %d lines from %d (total %d)" % (self.name, len(outLines), pid, len(self.line_cache))) |
| if len(outLines): |
| self.cache_lock.acquire() |
| self.line_cache.extend(outLines) |
| self.cache_lock.release() |
| |
| def __get_lines(self, timeout): |
| count=0 |
| if not len(self.file_list): |
| raise ValueError("No sources to read from") |
| |
| pending = [] |
| #print("%s waiting for %d operations" % (self.name, self.pending)) |
| for f in self.file_list: |
| t = f.harvest_async(self) |
| if t: |
| pending.append(t) |
| |
| for t in pending: |
| t.join(60.0) |
| if t.isAlive(): |
| self.logger.log("%s: Aborting after 20s waiting for %s logging commands" % (self.name, repr(t))) |
| return |
| |
| #print("Got %d lines" % len(self.line_cache)) |
| |
| def end(self): |
| for f in self.file_list: |
| f.end() |
| |
| def look(self, timeout=None, silent=False): |
| '''Examine the log looking for the given patterns. |
| It starts looking from the place marked by setwatch(). |
| This function looks in the file in the fashion of tail -f. |
| It properly recovers from log file truncation, but not from |
| removing and recreating the log. It would be nice if it |
| recovered from this as well :-) |
| |
| We return the first line which matches any of our patterns. |
| ''' |
| if timeout == None: timeout = self.Timeout |
| |
| if trace_lw: |
| silent = False |
| |
| lines=0 |
| needlines=True |
| begin=time.time() |
| end=begin+timeout+1 |
| if self.debug_level > 2: self.debug("starting single search: timeout=%d, begin=%d, end=%d" % (timeout, begin, end)) |
| |
| if not self.regexes: |
| self.debug("Nothing to look for") |
| return None |
| |
| if timeout == 0: |
| for f in self.file_list: |
| f.setend() |
| |
| while True: |
| if len(self.line_cache): |
| lines += 1 |
| |
| self.cache_lock.acquire() |
| line = self.line_cache[0] |
| self.line_cache.remove(line) |
| self.cache_lock.release() |
| |
| which=-1 |
| if re.search("CTS:", line): |
| continue |
| if self.debug_level > 2: self.debug("Processing: "+ line) |
| for regex in self.regexes: |
| which=which+1 |
| if self.debug_level > 3: self.debug("Comparing line to: "+ regex) |
| #import string |
| #matchobj = re.search(string.lower(regex), string.lower(line)) |
| matchobj = re.search(regex, line) |
| if matchobj: |
| self.whichmatch=which |
| if self.returnonlymatch: |
| return matchobj.group(self.returnonlymatch) |
| else: |
| self.debug("Matched: "+line) |
| if self.debug_level > 1: self.debug("With: "+ regex) |
| return line |
| |
| elif timeout > 0 and end < time.time(): |
| if self.debug_level > 1: self.debug("hit timeout: %d" % timeout) |
| |
| timeout = 0 |
| for f in self.file_list: |
| f.setend() |
| |
| else: |
| self.__get_lines(timeout) |
| if len(self.line_cache) == 0 and end < time.time(): |
| self.debug("Single search terminated: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), lines)) |
| return None |
| else: |
| self.debug("Waiting: start=%d, end=%d, now=%d, lines=%d" % (begin, end, time.time(), len(self.line_cache))) |
| time.sleep(1) |
| |
| self.debug("How did we get here") |
| return None |
| |
| def lookforall(self, timeout=None, allow_multiple_matches=None, silent=False): |
| '''Examine the log looking for ALL of the given patterns. |
| It starts looking from the place marked by setwatch(). |
| |
| We return when the timeout is reached, or when we have found |
| ALL of the regexes that were part of the watch |
| ''' |
| |
| if timeout == None: timeout = self.Timeout |
| save_regexes = self.regexes |
| returnresult = [] |
| |
| if trace_lw: |
| silent = False |
| |
| if not silent: |
| self.debug("starting search: timeout=%d" % timeout) |
| for regex in self.regexes: |
| if self.debug_level > 2: self.debug("Looking for regex: "+regex) |
| |
| while (len(self.regexes) > 0): |
| oneresult = self.look(timeout) |
| if not oneresult: |
| self.unmatched = self.regexes |
| self.matched = returnresult |
| self.regexes = save_regexes |
| self.end() |
| return None |
| |
| returnresult.append(oneresult) |
| if not allow_multiple_matches: |
| del self.regexes[self.whichmatch] |
| |
| else: |
| # Allow multiple regexes to match a single line |
| tmp_regexes = self.regexes |
| self.regexes = [] |
| which = 0 |
| for regex in tmp_regexes: |
| matchobj = re.search(regex, oneresult) |
| if not matchobj: |
| self.regexes.append(regex) |
| |
| self.unmatched = None |
| self.matched = returnresult |
| self.regexes = save_regexes |
| return returnresult |
| |