#!/usr/bin/python # This script will ingest all pcap files in a specified directory tree, then # use SANCP[1] to generate session data. Fields deemed interesting will be # inserted to the database. # Use pkt_to_db_load.py -h for more detailed usage. # # WARNING: This script is pretty embarassing. Please don't laugh. # It was designed as a proof of concept for a demo, not high-grade use # in an enterprise environment. Suggestions welcome, bugfixes not # promised. # # Known pitfalls: # - Undetermined at this time :) # # Version 0.01 # # By: Phil Hagen (phil < at > lewestech.com) # (C) 2011 Lewes Technology Consulting, LLC # http://stuffphilwrites.com # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # # Database schema expected: # CREATE TABLE `traffic` ( # `interesting` enum('y','n','-') NOT NULL default '-', # `sancp_id` bigint(64) unsigned NOT NULL, # `start_time_gmt` datetime NOT NULL, # `stop_time_gmt` datetime NOT NULL, # `eth_proto` smallint(16) unsigned NOT NULL, # `ip_proto` tinyint(8) unsigned NOT NULL, # `src_ip` int(32) unsigned NOT NULL, # `src_port` smallint(16) unsigned NOT NULL, # `dst_ip` int(32) unsigned NOT NULL, # `dst_port` smallint(16) unsigned NOT NULL, # `duration` int(10) unsigned NOT NULL, # `src_pkts` bigint(64) unsigned NOT NULL, # `dst_pkts` bigint(64) unsigned NOT NULL, # `src_bytes` bigint(64) unsigned NOT NULL, # `dst_bytes` bigint(64) unsigned NOT NULL, # PRIMARY KEY (`sancp_id`), # KEY `src_ip` (`src_ip`), # KEY `dst_ip` (`dst_ip`), # KEY `dst_port` (`dst_port`), # KEY `src_port` (`src_port`) # ); import subprocess import MySQLdb import os import glob import sys from optparse import OptionParser from time import asctime import tempfile import shutil import magic import pdb #insert this to break into the debugger: ## pdb.set_trace() # set up options and load from commandline parser = OptionParser() parser.add_option('-d', '--dir', dest='sourceDir', help='Directory containing source files (default: %s)' % ('./'), default='./') parser.add_option('-q', action="store_false", dest="verboseMode", help="Quiet mode (default)", default=False) parser.add_option('-v', action="store_true", dest="verboseMode", help="Verbose mode") parser.add_option('-t', action="store_true", dest="testMode", help="Test mode: Don't perform any sql inserts (implies -v)", default=False) (options, args) = parser.parse_args() # set up some variables sourceDir = options.sourceDir verboseMode = options.verboseMode testMode = options.testMode if testMode: verboseMode = True logFileName = os.path.join(os.path.expandvars('$HOME'),'pkt_to_db_load.log') logFile = open(logFileName, 'a') totalFiles = 0 processedFiles = 1 totalInserts = 0 globalErrors = False dbConn = MySQLdb.connect(host = 'localhost', user = 'root', passwd = 'password', db = 'mfr10038_hagen') sys.stdout = os.fdopen(sys.stdout.fileno(),'w',0) def handle_pcap(inputFile, workingDir): global dbCursor, totalInserts, totalFiles, verboseMode, testMode, globalErrors if verboseMode: print(' - distilling .pcap into connection data') sancpDirName = os.path.join(workingDir, 'sancp') os.mkdir(sancpDirName) sancpCommand = 'sancp -H -R -P -r %s -d %s' % (inputFile, sancpDirName) sancpProc = subprocess.Popen(sancpCommand, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) stdOutValue, stdErrValue = sancpProc.communicate() returnCode = sancpProc.returncode if (returnCode != 0): logFile.write("""ERROR: sancp process returned error. input file: %s Return Value: %d stdout: %s stderr: %s""" % (inputFile, returnCode, stdOutValue, stdErrValue)) globalErrors = True else: fieldList = [ 'sancp_id', 'start_time_gmt', 'stop_time_gmt', 'eth_proto', 'ip_proto', 'src_ip', 'src_port', 'dst_ip', 'dst_port ', 'duration', 'src_pkts', 'dst_pkts', 'src_bytes', 'dst_bytes' ] for sancpFile in glob.glob(os.path.join(sancpDirName, 'stats.any.*')): sancpResults = open(sancpFile, 'r') linesInserted = 0 print " - Inserting to db: ", for resultLine in sancpResults: allConnectionData = resultLine.split('|') connectionData = [] connectionData.extend(allConnectionData[0:3]) connectionData.extend(allConnectionData[4:11]) connectionData.extend(allConnectionData[12:16]) connectionData[1] = "'%s'" % connectionData[1] connectionData[2] = "'%s'" % connectionData[2] connectionData[5] = "'%s'" % connectionData[5] connectionData[7] = "'%s'" % connectionData[7] try: sqlInsert = 'INSERT INTO `traffic_flow` (%s) VALUES (%s)' % (','.join(fieldList), ','.join(connectionData)) if not testMode: dbCursor.execute(sqlInsert) linesInserted += 1 except MySQLdb.IntegrityError, message: errorcode = message[0] if errorcode == 1062: #MySQL error code for disallowed duplicate pass else: print 'ERROR! Unhandled exception.' print message.join(' ') sys.exit(2) if linesInserted % 100 == 0: sys.stdout.write('.') sancpResults.close() os.remove(sancpFile) totalInserts = totalInserts + linesInserted dbConn.commit() print(" Committed %d records to db (%d total)") % (linesInserted, totalInserts) shutil.rmtree(sancpDirName) def handle_input(sourceDirName): global totalFiles, verboseMode, globalErrors, processedFiles m = magic.Magic() for rootDir, dirNames, fileNames in os.walk(os.path.join(sourceDirName)): # TODO: this is a cheap shortcut. Since this function is recursively called, we reset the global variable # during nested calls. Need to fix that. if totalFiles == 0: totalFiles = len(fileNames) for inputFileName in fileNames: fullFilePath = os.path.join(rootDir, inputFileName) tempFileName = os.path.join(tempDirName, inputFileName) if verboseMode: print "- processing file: %s (%d/%d)" % (fullFilePath, processedFiles, totalFiles) # first, create a copy - never work on the original shutil.copy2(fullFilePath, tempFileName) # initialize as an empty string fileType = '' # look to see what kind of file we have. Just take the first token # TODO: will probably require more extensive testing if we go beyond pcap inputs fileType = m.from_file(tempFileName).split(' ')[0].lower() # pcap file if fileType == 'tcpdump': if verboseMode: print ' - pcap' handle_pcap(tempFileName, tempDirName) os.remove(tempFileName) print processedFiles = processedFiles + 1 # uh-oh.... else: logFile.write("""UNKNOWN FILE TYPE! - \tfilename: %s\n\ttype: %s""" % (tempFileName, fileType)) # OK, exit() may be harsh. maybe log and continue? sys.exit(2) ############################################################################### if testMode: print "TESTING load" logFile.write("""%s: started TEST PROCESSING Source Directory: %s """ % (asctime(), os.path.abspath(sourceDir))) else: print "processing data load" logFile.write("""%s: started processing Source Directory: %s """ % (asctime(), os.path.abspath(sourceDir))) dbCursor = dbConn.cursor() # create a temp dir to use as a home base tempDirName = tempfile.mkdtemp(prefix="pktload_") #print "temp dir is: %s" % (tempDirName) # handle_input() does the heavy lifting handle_input(sourceDir) if not testMode: dbCursor.close() print("Committed %d records\n" % (totalInserts)) logFile.write("""Inserted %d records ---------------------------------------- """ % totalInserts) # clean house os.rmdir(tempDirName) logFile.close() if globalErrors: print """WARNING! One or more errors were encountered during this ingest. Consult the log file for details. Log file is: %s """ % (logFileName)