python3之SQLAlchemy

发布于 2018-08-22  231 次阅读


直接放代码
首先要创建一个实体类ProxyMain
SqlHelper中create_db方法是将所有实体类映射到数据库中
最下面有SqlHelper使用方法

pip3 install sqlalchemy

from sqlalchemy import Column, String, DateTime,Integer,VARCHAR,create_engine,MetaData,Table
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
import datetime
import os
from config import DATABASE_PATH
from enum import IntEnum

class ProxyType(IntEnum):
    高匿 = 0,
    匿名 = 1,
    透明 = 2,
class ProxyProtocol(IntEnum):
    http = 0,
    https = 1,
    http_https = 2,

BaseModel = declarative_base()

class ProxyMain(BaseModel):
    __tablename__ = 'Proxy_Main'

    id = Column(Integer, primary_key=True, autoincrement=True)
    ip = Column(String(15))
    port = Column(Integer,nullable=False)
    speed = Column(Integer,nullable=False,default=1)
    country = Column(VARCHAR(50), nullable=False)
    area = Column(VARCHAR(50), nullable=False)
    createdatetime = Column(DateTime(), default=datetime.datetime.utcnow)
    checkdatetime = Column(DateTime(), default=datetime.datetime.utcnow)
    score = Column(Integer, nullable=False, default=10)
    type = Column(Integer, nullable=False,default=0)
    protocol = Column(Integer, nullable=False, default=0)

# 初始化数据库连接:
engine = create_engine('sqlite:///' + DATABASE_PATH, echo=False, connect_args={'check_same_thread': False})
# 创建DBSession类型:
DBSession = sessionmaker(bind=engine)

class SqlHelper(object):
    params = {'ip': ProxyMain.ip, 'port': ProxyMain.port, 'type': ProxyMain.type, 'protocol': ProxyMain.protocol,
              'country': ProxyMain.country, 'area': ProxyMain.area, 'score': ProxyMain.score}
    @staticmethod
    def create_db():
        BaseModel.metadata.create_all(engine)

    @staticmethod
    def drop_db():
        BaseModel.metadata.drop_all(engine)

    @staticmethod
    def execute(sql):
        conn = engine.connect()
        conn.execute(sql)
        conn.close()

    @staticmethod
    def query(sql,count=0):
        conn = engine.connect()
        if count == 0:
            result = conn.execute(sql).fetchall()
        else:
            result = conn.execute(sql).fetchmany(count)
        conn.close()
        fields=[]
        for field in ProxyMain.__dict__:
            if '_' not in field:
                fields.append(field)
        dict_result = []
        for row in result:
            temp={}
            #例如 ('id',3)
            for column in row.items():
                if(column[0] in SqlHelper.params.keys()):
                    temp[column[0]] = column[1]
            dict_result.append(temp)
        return dict_result

    @staticmethod
    def add(model):
        session = DBSession()
        new_model = ProxyMain(ip=model["ip"],port=model["port"],speed=model["speed"],type=model["type"],\
            protocol=model["protocol"],country=model["country"],area=model["area"])
        row_affect = session.add(new_model)
        session.commit()
        return row_affect

    @staticmethod
    def get(count=None,conditions=None):
        session = DBSession()
        conditionlist = []
        if conditions:
            for key in list(conditions.keys()):
                if(SqlHelper.params.get(key,None)):
                    conditionlist.append(SqlHelper.params.get(key) == conditions.get(key))
        query = session.query(ProxyMain).order_by(ProxyMain.score.desc())
        for c in conditionlist:
            query = query.filter(c)
        if count:
            return query.limit(count).all()
        else:
            return query.all()

    @staticmethod
    def update(model,conditions):
        session = DBSession()
        conditionlist = []
        for key in list(conditions.keys()):
            if(SqlHelper.params.get(key,None)):
                conditionlist.append(SqlHelper.params.get(key) == conditions.get(key))
        query = session.query(ProxyMain)
        for c in conditionlist:
            query = query.filter(c)
        updatevalue = {}
        for key in list(model.keys()):
            if SqlHelper.params.get(key, None):
                updatevalue[SqlHelper.params.get(key, None)] = model.get(key)
        row_affect = query.update(model)
        session.commit()
        return row_affect

    @staticmethod
    def delete(conditions):
        session = DBSession()
        conditionlist = []
        for key in list(conditions.keys()):
            if(SqlHelper.params.get(key,None)):
                conditionlist.append(SqlHelper.params.get(key) == conditions.get(key))
        query = session.query(ProxyMain)
        for c in conditionlist:
            query = query.filter(c)
        row_affect = query.delete()
        session.commit()
        return row_affect


#SqlHelper使用
#增
model = {'ip':ip,'port':port,'speed':speed,'type':type,'protocol':protocol,'country':country,'area':area}
SqlHelper.add(model)
#删 参数为dic类型 为条件
SqlHelper.delete({'ip':proxy.ip,'port':proxy.port})
#改 第一个参数为dic类型 为要更新的字段的值,第二个参数为dic类型,条件
SqlHelper.update({'score':proxy.score - 1,'checkdatetime':datetime.now()},{'ip':proxy.ip,'port':proxy.port})
#查
proxies = SqlHelper.get()
proxies = SqlHelper.get(10,{'country':'中国'})
#自定义SQL语句执行
SqlHelper.execute('delete from Proxy_Main where Score < 5')
#自定义语句查询 返回字典集合
dict_result = SqlHelper.query('select * from Proxy_Main')
dict_result = SqlHelper.query('select * from Proxy_Main',10)

LoneKing