最新文章专题视频专题问答1问答10问答100问答1000问答2000关键字专题1关键字专题50关键字专题500关键字专题1500TAG最新视频文章推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37视频文章20视频文章30视频文章40视频文章50视频文章60 视频文章70视频文章80视频文章90视频文章100视频文章120视频文章140 视频2关键字专题关键字专题tag2tag3文章专题文章专题2文章索引1文章索引2文章索引3文章索引4文章索引5123456789101112131415文章专题3
当前位置: 首页 - 科技 - 知识百科 - 正文

python实现的文件同步服务器实例

来源:动视网 责编:小采 时间:2020-11-27 14:33:50
文档

python实现的文件同步服务器实例

python实现的文件同步服务器实例:本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下: 服务端使用asyncore, 收到文件后保存到本地。 客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。 重点: 1. 使用structs打包发送文件的信息,服务
推荐度:
导读python实现的文件同步服务器实例:本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下: 服务端使用asyncore, 收到文件后保存到本地。 客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。 重点: 1. 使用structs打包发送文件的信息,服务


本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下:

服务端使用asyncore, 收到文件后保存到本地。

客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。

重点:

1. 使用structs打包发送文件的信息,服务端收到后,根据文件信息来接收客户端传送过来的文件。

2. 客户端使用多线程,pyinotify监视到文件变化,放到队列中,由另外一个线程发送。

上代码:

服务端:

# receive file from client and store them into file use asyncore.# 
#/usr/bin/python 
#coding: utf-8 
import asyncore 
import socket 
from socket import errno 
import logging 
import time 
import sys 
import struct 
import os 
import fcntl 
import threading 
from rrd_graph import MakeGraph 
try: 
 import rrdtool 
except (ImportError, ImportWarnning): 
 print "Hope this information can help you:" 
 print "Can not find pyinotify module in sys path, just run [apt-get install python-rrdtool] in ubuntu." 
 sys.exit(1) 
class RequestHandler(asyncore.dispatcher): 
 def __init__(self, sock, map=None, chunk_size=1024): 
 self.logger = logging.getLogger('%s-%s' % (self.__class__.__name__, str(sock.getsockname()))) 
 self.chunk_size = chunk_size 
 asyncore.dispatcher.__init__(self,sock,map) 
 self.data_to_write = list() 
 def readable(self): 
 #self.logger.debug("readable() called.") 
 return True 
 def writable(self): 
 response = (not self.connected) or len(self.data_to_write) 
 #self.logger.debug('writable() -> %s data length -> %s' % (response, len(self.data_to_write))) 
 return response 
 def handle_write(self): 
 data = self.data_to_write.pop() 
 #self.logger.debug("handle_write()->%s size: %s",data.rstrip('
'),len(data)) 
 sent = self.send(data[:self.chunk_size]) 
 if sent < len(data): 
 remaining = data[sent:] 
 self.data_to_write.append(remaining) 
 def handle_read(self): 
 self.writen_size = 0 
 nagios_perfdata = '../perfdata' 
 head_packet_format = "!LL128s128sL" 
 head_packet_size = struct.calcsize(head_packet_format) 
 data = self.recv(head_packet_size) 
 if not data: 
 return 
 filepath_len, filename_len, filepath,filename, filesize = struct.unpack(head_packet_format,data) 
 filepath = os.path.join(nagios_perfdata, filepath[:filepath_len]) 
 filename = filename[:filename_len] 
 self.logger.debug("update file: %s" % filepath + '/' + filename)
 try: 
 if not os.path.exists(filepath): 
 os.makedirs(filepath) 
 except OSError: 
 pass 
 self.fd = open(os.path.join(filepath,filename), 'w') 
 #self.fd = open(filename,'w') 
 if filesize > self.chunk_size: 
 times = filesize / self.chunk_size 
 first_part_size = times * self.chunk_size 
 second_part_size = filesize % self.chunk_size 
 while 1: 
 try: 
 data = self.recv(self.chunk_size) 
 #self.logger.debug("handle_read()->%s size.",len(data)) 
 except socket.error,e: 
 if e.args[0] == errno.EWOULDBLOCK: 
 print "EWOULDBLOCK" 
 time.sleep(1) 
 else: 
 #self.logger.debug("Error happend while receive data: %s" % e) 
 break 
 else: 
 self.fd.write(data) 
 self.fd.flush() 
 self.writen_size += len(data) 
 if self.writen_size == first_part_size: 
 break 
 #receive the packet at last 
 while 1: 
 try: 
 data = self.recv(second_part_size) 
 #self.logger.debug("handle_read()->%s size.",len(data)) 
 except socket.error,e: 
 if e.args[0] == errno.EWOULDBLOCK: 
 print "EWOULDBLOCK" 
 time.sleep(1) 
 else: 
 #self.logger.debug("Error happend while receive data: %s" % e) 
 break 
 else: 
 self.fd.write(data) 
 self.fd.flush() 
 self.writen_size += len(data) 
 if len(data) == second_part_size: 
 break 
 elif filesize <= self.chunk_size: 
 while 1: 
 try: 
 data = self.recv(filesize) 
 #self.logger.debug("handle_read()->%s size.",len(data)) 
 except socket.error,e: 
 if e.args[0] == errno.EWOULDBLOCK: 
 print "EWOULDBLOCK" 
 time.sleep(1) 
 else: 
 #self.logger.debug("Error happend while receive data: %s" % e) 
 break 
 else: 
 self.fd.write(data) 
 self.fd.flush() 
 self.writen_size += len(data) 
 if len(data) == filesize: 
 break 
 self.logger.debug("File size: %s" % self.writen_size) 
class SyncServer(asyncore.dispatcher): 
 def __init__(self,host,port): 
 asyncore.dispatcher.__init__(self) 
 self.debug = True 
 self.logger = logging.getLogger(self.__class__.__name__) 
 self.create_socket(socket.AF_INET,socket.SOCK_STREAM) 
 self.set_reuse_addr() 
 self.bind((host,port)) 
 self.listen(2000) 
 def handle_accept(self): 
 client_socket = self.accept() 
 if client_socket is None: 
 pass 
 else: 
 sock, addr = client_socket 
 #self.logger.debug("Incoming connection from %s" % repr(addr)) 
 handler = RequestHandler(sock=sock) 
class RunServer(threading.Thread): 
 def __init__(self): 
 super(RunServer,self).__init__() 
 self.daemon = False 
 def run(self): 
 server = SyncServer('',9999) 
 asyncore.loop(use_poll=True) 
def StartServer(): 
 logging.basicConfig(level=logging.DEBUG, 
 format='%(name)s: %(message)s', 
 ) 
 RunServer().start() 
 #MakeGraph().start() 
if __name__ == '__main__': 
 StartServer()

客户端:

# monitor path with inotify(python module), and send them to remote server.# 
# use sendfile(2) instead of send function in socket, if we have python-sendfile installed.# 
import socket 
import time 
import os 
import sys 
import struct 
import threading 
import Queue 
try: 
 import pyinotify 
except (ImportError, ImportWarnning): 
 print "Hope this information can help you:" 
 print "Can not find pyinotify module in sys path, just run [apt-get install python-pyinotify] in ubuntu." 
 sys.exit(1) 
try: 
 from sendfile import sendfile 
except (ImportError,ImportWarnning): 
 pass 
filetype_filter = [".rrd",".xml"] 
def check_filetype(pathname): 
 for suffix_name in filetype_filter: 
 if pathname[-4:] == suffix_name: 
 return True 
 try: 
 end_string = pathname.rsplit('.')[-1:][0] 
 end_int = int(end_string) 
 except: 
 pass 
 else: 
 # means pathname endwith digit 
 return False 
class sync_file(threading.Thread): 
 def __init__(self, addr, events_queue): 
 super(sync_file,self).__init__() 
 self.daemon = False 
 self.queue = events_queue 
 self.addr = addr 
 self.chunk_size = 1024 
 def run(self): 
 while 1: 
 event = self.queue.get() 
 if check_filetype(event.pathname): 
 print time.asctime(),event.maskname, event.pathname 
 filepath = event.path.split('/')[-1:][0] 
 filename = event.name 
 filesize = os.stat(os.path.join(event.path, filename)).st_size 
 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
 filepath_len = len(filepath) 
 filename_len = len(filename) 
 sock.connect(self.addr) 
 offset = 0 
 data = struct.pack("!LL128s128sL",filepath_len, filename_len, filepath,filename,filesize) 
 fd = open(event.pathname,'rb') 
 sock.sendall(data) 
 if "sendfile" in sys.modules: 
 # print "use sendfile(2)" 
 while 1: 
 sent = sendfile(sock.fileno(), fd.fileno(), offset, self.chunk_size) 
 if sent == 0: 
 break 
 offset += sent 
 else: 
 # print "use original send function" 
 while 1: 
 data = fd.read(self.chunk_size) 
 if not data: break 
 sock.send(data) 
 sock.close() 
 fd.close() 
class EventHandler(pyinotify.ProcessEvent): 
 def __init__(self, events_queue): 
 super(EventHandler,self).__init__() 
 self.events_queue = events_queue 
 def my_init(self): 
 pass 
 def process_IN_CLOSE_WRITE(self,event): 
 self.events_queue.put(event) 
 def process_IN_MOVED_TO(self,event): 
 self.events_queue.put(event) 
def start_notify(path, mask, sync_server): 
 events_queue = Queue.Queue() 
 sync_thread_pool = list() 
 for i in range(500): 
 sync_thread_pool.append(sync_file(sync_server, events_queue)) 
 for i in sync_thread_pool: 
 i.start() 
 wm = pyinotify.WatchManager() 
 notifier = pyinotify.Notifier(wm,EventHandler(events_queue)) 
 wdd = wm.add_watch(path,mask,rec=True) 
 notifier.loop() 
def do_notify(): 
 perfdata_path = '/var/lib/pnp4nagios/perfdata' 
 mask = pyinotify.IN_CLOSE_WRITE|pyinotify.IN_MOVED_TO 
 sync_server = ('127.0.0.1',9999) 
 start_notify(perfdata_path,mask,sync_server) 
if __name__ == '__main__': 
 do_notify()

python监视线程池

#!/usr/bin/python 
import threading 
import time 
class Monitor(threading.Thread): 
 def __init__(self, *args,**kwargs): 
 super(Monitor,self).__init__() 
 self.daemon = False 
 self.args = args 
 self.kwargs = kwargs 
 self.pool_list = [] 
 def run(self): 
 print self.args 
 print self.kwargs 
 for name,value in self.kwargs.items(): 
 obj = value[0] 
 temp = {} 
 temp[name] = obj 
 self.pool_list.append(temp) 
 while 1: 
 print self.pool_list 
 for name,value in self.kwargs.items(): 
 obj = value[0] 
 parameters = value[1:] 
 died_threads = self.cal_died_thread(self.pool_list,name)
 print "died_threads", died_threads 
 if died_threads >0: 
 for i in range(died_threads): 
 print "start %s thread..." % name 
 t = obj[0].__class__(*parameters) 
 t.start() 
 self.add_to_pool_list(t,name) 
 else: 
 break 
 time.sleep(0.5) 
 def cal_died_thread(self,pool_list,name): 
 i = 0 
 for item in self.pool_list: 
 for k,v in item.items(): 
 if name == k: 
 lists = v 
 for t in lists: 
 if not t.isAlive(): 
 self.remove_from_pool_list(t) 
 i +=1 
 return i 
 def add_to_pool_list(self,obj,name): 
 for item in self.pool_list: 
 for k,v in item.items(): 
 if name == k: 
 v.append(obj) 
 def remove_from_pool_list(self, obj): 
 for item in self.pool_list: 
 for k,v in item.items(): 
 try: 
 v.remove(obj) 
 except: 
 pass 
 else: 
 return

使用方法:

rrds_queue = Queue.Queue() 
 make_rrds_pool = [] 
 for i in range(5): 
 make_rrds_pool.append(MakeRrds(rrds_queue)) 
 for i in make_rrds_pool: 
 i.start() 
 make_graph_pool = [] 
 for i in range(5): 
 make_graph_pool.append(MakeGraph(rrds_queue)) 
 for i in make_graph_pool: 
 i.start() 
 monitor = Monitor(make_rrds_pool=(make_rrds_pool, rrds_queue), 
 make_graph_pool=(make_graph_pool, rrds_queue)) 
 monitor.start()

解析:

1. 接受字典参数,value为一个元组,第一个元素是线程池,后面的都是参数。
2. 每0.5秒监视线程池中的线程数量,如果线程死掉了,记录死掉线程的数目,再启动同样数量的线程。
3. 如果没有线程死去,则什么也不做。

从外部调用Django模块

import os 
import sys 
sys.path.insert(0,'/data/cloud_manage') 
from django.core.management import setup_environ 
import settings 
setup_environ(settings) 
from common.monitor import Monitor 
from django.db import connection, transaction

前提就是,要新建一个django的project,这里我们新建了一个cloud_manage.
这样不仅可以调用django自身的模块,还能调用project本身的东西。

希望本文所述对大家的Python程序设计有所帮助。

文档

python实现的文件同步服务器实例

python实现的文件同步服务器实例:本文实例讲述了python实现的文件同步服务器。分享给大家供大家参考。具体实现方法如下: 服务端使用asyncore, 收到文件后保存到本地。 客户端使用pyinotify监视目录的变化 ,把变动的文件发送到服务端。 重点: 1. 使用structs打包发送文件的信息,服务
推荐度:
  • 热门焦点

最新推荐

猜你喜欢

热门推荐

专题
Top