pysftp SFTP Pool

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164

import pysftp
import os
import sys
import time
import paramiko
from queue import Queue, Empty

# -----------------------------


from etc import config
import utils
from utils import logger_web
from utils import logger_lms as logger


# -----------------------------
broker=config.LMS["broker"]
notifier_list=config.Notifier
notifier_list2=config.Notifier2

# -----------------------------

class SFTPConnectionPool:
    """SFTP连接池"""
    def __init__(self,host,port,username,password,max_connections=20):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.max_connections = max_connections
        self.pool=Queue(maxsize=max_connections)
        self.cnopts = paramiko.SSHClient()
        self.cnopts.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        # self.cnopts.hostkeys = None
        self._init_pool()

    def _init_pool(self):
        for _ in range(self.max_connections):
            conn=self._create_conn()
            if conn:
                self.pool.put(conn)
            else:
                logger.error("初始化SFTP连接池失败")

    def _create_conn(self):
        transport=paramiko.Transport((self.host,self.port))
        transport.connect(username=self.username,password=self.password)
        return paramiko.SFTPClient.from_transport(transport)

    def get_conn(self):
        try:
            return self.pool.get(timeout=10)
        except Empty:
            logger.error("SFTPConnPool no avaiable connections ...")

    def release_conn(self,conn):
        if conn:
            self.pool.put(conn) # 将对象放回连接池
        else:
            logger.warning("SFTPConnPool attempted to release a None connection")

    def close_all_conn(self):
        while not self.pool.empty():
            conn=self.pool.get()
            if conn:
                conn.close()
            else:
                logger.warning("SFTPConnPool Found a None connection in the pool")




class SFTPClient:
    def __init__(self, host,port, username, password, upload_dir, download_dir):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.upload_dir = upload_dir
        self.download_dir = download_dir
        self.max_retries = 3
        self.retry_delay = 5
        self.pool=SFTPConnectionPool(host,port,username,password)

    def ensure_connected(self):
        """确保SFTP连接是活跃的,如果断开则重连"""
        sftp=self.pool.get_conn()
        if not sftp:
            return False
        try:
            sftp.listdir()
            return True
        except Exception as e:
            logger.error("SFTPClient connection check failed, error={}".format(e))
            self.pool.release_conn(sftp)
            time.sleep(3)
            return False

    def upload_file(self, local_file):
        logger.info(f"SFTPClient start upload file={local_file} to remote_path={self.upload_dir}")
        if not os.path.exists(local_file):
            logger.error("SFTPClient UploadFile ={} does not exist".format(local_file))
            return False
        for attempt in range(self.max_retries):
            try:
                if not self.ensure_connected():
                    continue
                sftp=self.pool.get_conn()
                sftp.chdir(self.upload_dir)
                filename = os.path.basename(local_file)
                sftp.put(local_file) # /root/project/lms_jpm/static/files/xxx_request.csv
                logger.info(f"SFTPClient Successfully upload file={filename} to remote_path={self.upload_dir}")
                return True
            except Exception as e:
                logger.error(f"SFTPClient Failed upload file={local_file}, attempt {attempt+1} error={e}")
                if attempt < self.max_retries-1:
                    logger.error(f"SFTPClient Retrying upload file={local_file}, seconds ...")
                    time.sleep(self.retry_delay)
        return False

    def search_file(self, file):
        logger.info(f"SFTPClient start search remote_path={self.download_dir}/{file} ...")
        for attempt in range(self.max_retries):
            try:
                self.ensure_connected()
                self.conn.chdir(self.download_dir)
                files = self.conn.listdir()
                # logger.info(f"----------listdir-----------------{self.download_dir}---------------------------")
                # logger.info(f"-------------------------{file}---------------------{attempt+1}--------------------")
                # logger.info(files)
                # logger.info("--------------------------------------------------------------------")
                # logger.info("--------------------------------------------------------------------")
                # logger.info("--------------------------------------------------------------------")
                if file in files:
                    # logger.info(f"SFTPClient search remote_path={self.download_dir}/{file} ****** find ******")
                    return True
                return False
            except Exception as e:
                logger.error(f"SFTPClient search failed remote_path={self.download_dir}/{file}"+
                    f", attempt= {attempt+1}, error={e}")
                if attempt < self.max_retries-1:
                    time.sleep(self.retry_delay)
        return False

    def download_file(self, file, local_path):
        logger.info(f"SFTPClient start download remote_file={file} to {local_path}")
        for attempt in range(self.max_retries):
            try:
                if not self.ensure_connected():
                    continue
                sftp=self.pool.get_conn()
                sftp.chdir(self.download_dir)
                sftp.get(file, local_path)
                logger.info(f"SFTPClient Successfully downloaded {file} to {local_path}")
                return True
            except Exception as e:
                logger.error(f"SFTPClient Download failed remote_file={file}, attempt= {attempt+1}, error={e}")
                if attempt < self.max_retries-1:
                    time.sleep(self.retry_delay)
                    if os.path.exists(f"{local_path}/{file}"):
                        os.remove(f"{local_path}/{file}")  # 删除可能的部分下载文件
        return False