直接放代码
首先要创建一个实体类ProxyMain
SqlHelper中create_db方法是将所有实体类映射到数据库中
最下面有SqlHelper使用方法
pip3 install sqlalchemy from sqlalchemy import Column, String, DateTime,Integer,VARCHAR,create_engine,MetaData,Table from sqlalchemy.orm import sessionmaker from sqlalchemy.ext.declarative import declarative_base import datetime import os from config import DATABASE_PATH from enum import IntEnum class ProxyType(IntEnum): 高匿 = 0, 匿名 = 1, 透明 = 2, class ProxyProtocol(IntEnum): http = 0, https = 1, http_https = 2, BaseModel = declarative_base() class ProxyMain(BaseModel): __tablename__ = 'Proxy_Main' id = Column(Integer, primary_key=True, autoincrement=True) ip = Column(String(15)) port = Column(Integer,nullable=False) speed = Column(Integer,nullable=False,default=1) country = Column(VARCHAR(50), nullable=False) area = Column(VARCHAR(50), nullable=False) createdatetime = Column(DateTime(), default=datetime.datetime.utcnow) checkdatetime = Column(DateTime(), default=datetime.datetime.utcnow) score = Column(Integer, nullable=False, default=10) type = Column(Integer, nullable=False,default=0) protocol = Column(Integer, nullable=False, default=0) # 初始化数据库连接: engine = create_engine('sqlite:///' + DATABASE_PATH, echo=False, connect_args={'check_same_thread': False}) # 创建DBSession类型: DBSession = sessionmaker(bind=engine) class SqlHelper(object): params = {'ip': ProxyMain.ip, 'port': ProxyMain.port, 'type': ProxyMain.type, 'protocol': ProxyMain.protocol, 'country': ProxyMain.country, 'area': ProxyMain.area, 'score': ProxyMain.score} @staticmethod def create_db(): BaseModel.metadata.create_all(engine) @staticmethod def drop_db(): BaseModel.metadata.drop_all(engine) @staticmethod def execute(sql): conn = engine.connect() conn.execute(sql) conn.close() @staticmethod def query(sql,count=0): conn = engine.connect() if count == 0: result = conn.execute(sql).fetchall() else: result = conn.execute(sql).fetchmany(count) conn.close() fields=[] for field in ProxyMain.__dict__: if '_' not in field: fields.append(field) dict_result = [] for row in result: temp={} #例如 ('id',3) for column in row.items(): if(column[0] in SqlHelper.params.keys()): temp[column[0]] = column[1] dict_result.append(temp) return dict_result @staticmethod def add(model): session = DBSession() new_model = ProxyMain(ip=model["ip"],port=model["port"],speed=model["speed"],type=model["type"],\ protocol=model["protocol"],country=model["country"],area=model["area"]) row_affect = session.add(new_model) session.commit() return row_affect @staticmethod def get(count=None,conditions=None): session = DBSession() conditionlist = [] if conditions: for key in list(conditions.keys()): if(SqlHelper.params.get(key,None)): conditionlist.append(SqlHelper.params.get(key) == conditions.get(key)) query = session.query(ProxyMain).order_by(ProxyMain.score.desc()) for c in conditionlist: query = query.filter(c) if count: return query.limit(count).all() else: return query.all() @staticmethod def update(model,conditions): session = DBSession() conditionlist = [] for key in list(conditions.keys()): if(SqlHelper.params.get(key,None)): conditionlist.append(SqlHelper.params.get(key) == conditions.get(key)) query = session.query(ProxyMain) for c in conditionlist: query = query.filter(c) updatevalue = {} for key in list(model.keys()): if SqlHelper.params.get(key, None): updatevalue[SqlHelper.params.get(key, None)] = model.get(key) row_affect = query.update(model) session.commit() return row_affect @staticmethod def delete(conditions): session = DBSession() conditionlist = [] for key in list(conditions.keys()): if(SqlHelper.params.get(key,None)): conditionlist.append(SqlHelper.params.get(key) == conditions.get(key)) query = session.query(ProxyMain) for c in conditionlist: query = query.filter(c) row_affect = query.delete() session.commit() return row_affect #SqlHelper使用 #增 model = {'ip':ip,'port':port,'speed':speed,'type':type,'protocol':protocol,'country':country,'area':area} SqlHelper.add(model) #删 参数为dic类型 为条件 SqlHelper.delete({'ip':proxy.ip,'port':proxy.port}) #改 第一个参数为dic类型 为要更新的字段的值,第二个参数为dic类型,条件 SqlHelper.update({'score':proxy.score - 1,'checkdatetime':datetime.now()},{'ip':proxy.ip,'port':proxy.port}) #查 proxies = SqlHelper.get() proxies = SqlHelper.get(10,{'country':'中国'}) #自定义SQL语句执行 SqlHelper.execute('delete from Proxy_Main where Score < 5') #自定义语句查询 返回字典集合 dict_result = SqlHelper.query('select * from Proxy_Main') dict_result = SqlHelper.query('select * from Proxy_Main',10)