最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

python实现封装得到virustotal扫描结果

来源:动视网 责编:小采 时间:2020-11-27 14:40:47
文档

python实现封装得到virustotal扫描结果

python实现封装得到virustotal扫描结果:本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下: import simplejson import urllib import urllib2 import os, sys import logging try: import sqlite3 excep
推荐度:
导读python实现封装得到virustotal扫描结果:本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下: import simplejson import urllib import urllib2 import os, sys import logging try: import sqlite3 excep


本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下:

import simplejson 
import urllib 
import urllib2 
import os, sys 
import logging 
 
try: 
 import sqlite3 
except ImportError: 
 sys.stderr.write("ERROR: Unable to locate Python SQLite3 module. " 
 "Please verify your installation. Exiting...
") 
 sys.exit(-1) 
 
MD5 = "5248f774d2ee0a10936d0b1dc89107f1" 
MD5 = "12fa5fb74201d9b6a14f63fbf9a81ff6" #do not have report on virustotal.com 
 
 
APIKEY = "xxxxxxxxxxxxxxxxxx"用自己的 

class VirusTotalDatabase: 
 """ 
 Database abstraction layer. 
 """ 
 def __init__(self, db_file): 
 log = logging.getLogger("Database.Init") 
 self.__dbfile = db_file 
 self._conn = None 
 self._cursor = None 
 
 # Check if SQLite database already exists. If it doesn't exist I invoke 
 # the generation procedure. 
 if not os.path.exists(self.__dbfile): 
 if self._generate(): 
 print("Generated database "%s" which didn't" 
 " exist before." % self.__dbfile) 
 else: 
 print("Unable to generate database") 
 
 # Once the database is generated of it already has been, I can 
 # initialize the connection. 
 try: 
 self._conn = sqlite3.connect(self.__dbfile) 
 self._cursor = self._conn.cursor() 
 except Exception, why: 
 print("Unable to connect to database "%s": %s." 
 % (self.__dbfile, why)) 
 
 log.debug("Connected to SQLite database "%s"." % self.__dbfile) 
 
 def _generate(self): 
 """ 
 Creates database structure in a SQLite file. 
 """ 
 if os.path.exists(self.__dbfile): 
 return False 
 
 db_dir = os.path.dirname(self.__dbfile) 
 if not os.path.exists(db_dir): 
 try: 
 os.makedirs(db_dir) 
 except (IOError, os.error), why: 
 print("Something went wrong while creating database " 
 "directory "%s": %s" % (db_dir, why)) 
 return False 
 
 conn = sqlite3.connect(self.__dbfile) 
 cursor = conn.cursor() 
 
 cursor.execute("CREATE TABLE virustotal (
" 
 " id INTEGER PRIMARY KEY,
" 
 " md5 TEXT NOT NULL,
" 
 " Kaspersky TEXT DEFAULT NULL,
" 
 " McAfee TEXT DEFAULT NULL,
" 
 " Symantec TEXT DEFAULT NULL,
" 
 " Norman TEXT DEFAULT NULL,
" 
 " Avast TEXT DEFAULT NULL,
" 
 " NOD32 TEXT DEFAULT NULL,
" 
 " BitDefender TEXT DEFAULT NULL,
" 
 " Microsoft TEXT DEFAULT NULL,
" 
 " Rising TEXT DEFAULT NULL,
" 
 " Panda TEXT DEFAULT NULL
" 
 ");") 
 print "create db:%s sucess" % self.__dbfile 
 
 return True 
 
 def _get_task_dict(self, row): 
 try: 
 task = {} 
 task["id"] = row[0] 
 task["md5"] = row[1] 
 task["Kaspersky"] = row[2] 
 task["McAfee"] = row[3] 
 task["Symantec"] = row[4] 
 task["Norman"] = row[5] 
 task["Avast"] = row[6] 
 task["NOD32"] = row[7] 
 task["BitDefender"] = row[8] 
 task["Microsoft"] = row[9] 
 task["Rising"] = row[10] 
 task["Panda"] = row[11] 
 return task 
 except Exception, why: 
 return None 
 
 def add_sample(self, md5, virus_dict): 
 """ 
 
 """ 
 task_id = None 
 
 if not self._cursor: 
 return None 
 if not md5 or md5 == "": 
 return None 
 
 Kaspersky = virus_dict.get("Kaspersky", None) 
 McAfee = virus_dict.get("McAfee", None) 
 Symantec = virus_dict.get("Symantec", None) 
 Norman = virus_dict.get("Norman", None) 
 Avast = virus_dict.get("Avast", None) 
 NOD32 = virus_dict.get("NOD32", None) 
 BitDefender = virus_dict.get("BitDefender", None) 
 Microsoft = virus_dict.get("Microsoft", None) 
 Rising = virus_dict.get("Rising", None) 
 Panda = virus_dict.get("Panda", None) 
 
 self._conn.text_factory = str 
 try: 
 self._cursor.execute("SELECT id FROM virustotal WHERE md5 = ?;", 
 (md5,)) 
 sample_row = self._cursor.fetchone() 
 except sqlite3.OperationalError, why: 
 print "sqlite3 error:%s
" % str(why) 
 return False 
 
 if sample_row: 
 try: 
 sample_row = sample_row[0] 
 self._cursor.execute("UPDATE virustotal SET Kaspersky=?, McAfee=?, Symantec=?, Norman=?, Avast=?, 
 NOD32=?, BitDefender=?, Microsoft=?, Rising=?, Panda=? WHERE id = ?;", 
 (Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, Microsoft, 
 Rising, Panda, sample_row)) 
 self._conn.commit() 
 task_id = sample_row 
 except sqlite3.OperationalError, why: 
 print("Unable to update database: %s." % why) 
 return False 
 else: #the sample not in the database 
 try: 
 self._cursor.execute("INSERT INTO virustotal " 
 "(md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, 
 Microsoft, Rising, Panda) " 
 "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", 
 (md5, Kaspersky, McAfee, Symantec, Norman, Avast, NOD32, BitDefender, 
 Microsoft, Rising, Panda)) 
 self._conn.commit() 
 task_id = self._cursor.lastrowid 
 except sqlite3.OperationalError, why: 
 print "why",str(why) 
 return None 
 print "add_to_db:%s, task_id:%s" % (str(self.__dbfile), str(task_id)) 
 return task_id 
 
 def get_sample(self): 
 """ 
 Gets a task from pending queue. 
 """ 
 log = logging.getLogger("Database.GetTask") 
 
 if not self._cursor: 
 log.error("Unable to acquire cursor.") 
 return None 
 
 # Select one item from the queue table with higher priority and older 
 # addition date which has not already been processed. 
 try: 
 self._cursor.execute("SELECT * FROM virustotal " 
 #"WHERE lock = 0 " 
 #"AND status = 0 " 
 "ORDER BY id, added_on LIMIT 1;") 
 except sqlite3.OperationalError, why: 
 log.error("Unable to query database: %s." % why) 
 return None 
 
 sample_row = self._cursor.fetchone() 
 
 if sample_row: 
 return self._get_task_dict(sample_row) 
 else: 
 return None 
 
 def search_md5(self, md5): 
 """ 
 
 """ 
 if not self._cursor: 
 return None 
 
 if not md5 or len(md5) != 32: 
 return None 
 
 try: 
 self._cursor.execute("SELECT * FROM virustotal " 
 "WHERE md5 = ? " 
 #"AND status = 1 " 
 "ORDER BY id DESC;", 
 (md5,)) 
 except sqlite3.OperationalError, why: 
 return None 
 
 task_dict = {} 
 for row in self._cursor.fetchall(): 
 task_dict = self._get_task_dict(row) 
 #if task_dict: 
 #tasks.append(task_dict) 
 
 return task_dict 
 
 
 
class VirusTotal: 
 """""" 
 
 def __init__(self, md5): 
 """Constructor""" 
 self._virus_dict = {} 
 self._md5 = md5 
 self._db_file = r"./db/virustotal.db" 
 self.get_report_dict() 
 
 def repr(self): 
 return str(self._virus_dict) 
 
 def submit_md5(self, file_path): 
 import postfile 
 #submit the file 
 FILE_NAME = os.path.basename(file_path) 
 
 
 host = "www.virustotal.com" 
 selector = "https://www.virustotal.com/vtapi/v2/file/scan" 
 fields = [("apikey", APIKEY)] 
 file_to_send = open(file_path, "rb").read() 
 files = [("file", FILE_NAME, file_to_send)] 
 json = postfile.post_multipart(host, selector, fields, files) 
 print json 
 pass 
 
 def get_report_dict(self): 
 result_dict = {} 
 
 url = "https://www.virustotal.com/vtapi/v2/file/report" 
 parameters = {"resource": self._md5, 
 "apikey": APIKEY} 
 data = urllib.urlencode(parameters) 
 req = urllib2.Request(url, data) 
 response = urllib2.urlopen(req) 
 json = response.read() 
 
 response_dict = simplejson.loads(json) 
 if response_dict["response_code"]: #has result 
 scans_dict = response_dict.get("scans", {}) 
 for anti_virus_comany, virus_name in scans_dict.iteritems(): 
 if virus_name["detected"]: 
 result_dict.setdefault(anti_virus_comany, virus_name["result"]) 
 return result_dict 
 
 def write_to_db(self): 
 """""" 
 db = VirusTotalDatabase(self._db_file) 
 virus_dict = self.get_report_dict() 
 db.add_sample(self._md5, virus_dict) 

使用方法如下:

config = {'input':"inputMd5s"} 
fp = open(config['input'], "r") 
content = fp.readlines() 
MD5S = [] 
for md5 in ifilter(lambda x:len(x)>0, imap(string.strip, content)): 
 MD5S.append(md5) 
print "MD5S",MD5S 
fp.close() 
 
 
from getVirusTotalInfo import VirusTotal 
#得到扫描结果并写入数库 
for md5 in MD5S: 
 virus_total = VirusTotal(md5) 
 virus_total.write_to_db() 

希望本文所述对大家的Python程序设计有所帮助。

文档

python实现封装得到virustotal扫描结果

python实现封装得到virustotal扫描结果:本文实例讲述了python实现封装得到virustotal扫描结果的方法。分享给大家供大家参考。具体方法如下: import simplejson import urllib import urllib2 import os, sys import logging try: import sqlite3 excep
推荐度:
标签: 扫描 封装 结果
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top