Skip to content

10. MySQL 练习 [税费计算]

10.1. 将文本文件导入 MySQL 表

以下脚本将从以下文本文件中导入数据:

12620:13190:15640:24740:31810:39970:48360:55790:92970:127860:151250:172040:195000:0
0:0.05:0.1:0.15:0.2:0.25:0.3:0.35:0.4:0.45:0.5:0.55:0.6:0.65
0:631:1290.5:272.5:3309.5:4900:6898.5:9316.5:12106:16754.5:23147.5:30710:39312:49062

在以下 MySQL 数据库 [dbimpots] 的 [impots] 表中:

 

将使用凭据 (root,"") 连接到 [dbimpots] 数据库。


程序 (impotstxt2mysql)

我们将采用以下架构:

我们将编写的 [console] 脚本将使用 [ImpotsFile] 类来访问文本文件中的数据。对数据库的写入操作将使用之前讨论过的方法来实现。

脚本代码如下:

#    -*- coding=utf-8 -*-

#    import of Impots class module
from impots import *
import re

# --------------------------------------------------------------------------
def copyToMysql(limites,coeffR,coeffN,HOTE,USER,PWD,BASE,TABLE):
    #  copy the 3 numerical limit tables, coeffR, coeffN
    #  in table TABLE of mysql database BASE
    #  the mysql database is on machine HOTE
    #  the user is identified by USER and PWD

    #  put SQL queries in a list
    #    deletion table
    requetes=["drop table %s" % (TABLE)]
    #    table creation
    requete="create table %s (limites decimal(10,2), coeffR decimal(6,2), coeffN decimal(10,2))" % (TABLE)
    requetes.append(requete)
    #    filling
    for i in range(len(limites)):
        #  insertion request
        requetes.append("insert into %s (limites,coeffR,coeffN) values (%s,%s,%s)" % (TABLE,limites[i],coeffR[i],coeffN[i]))
    #    execute SQL orders
    return executerCommandes(HOTE,USER,PWD,BASE,requetes,False,False)


def executerCommandes(HOTE,ID,PWD,BASE,requetes,suivi=False,arret=True):
    #    uses connection (HOTE,ID,PWD,BASE)
    #  executes on this connection the SQL commands contained in the request list
    #  if followed=True then each execution of a SQL order is displayed, indicating success or failure
    #  if arret=False, the function stops on the 1st error encountered, otherwise it executes all sql commands
    #  the function returns a list (no. of errors, error1, error2, ...)

    #    connection
    try:
        connexion=MySQLdb.connect(host=HOTE,user=ID,passwd=PWD,db=BASE)
    except MySQLdb.OperationalError,erreur:
        return [1,"Erreur lors de la connexion a MySQL sous l'identite (%s,%s,%s,%s) : %s" % (HOTE, ID, PWD, BASE, erreur)]

    #  a cursor is requested
    curseur=connexion.cursor()
    #  execute SQL requests contained in the request list
    #    we run them - initially no errors
    erreurs=[0]
    for i in range(len(requetes)):
        #  put the query in a local variable
        requete=requetes[i]
        #    do we have an empty query?
        if re.match(r"^\s*$",requete):
            continue
        #    query execution i
        erreur=""
        try:
            curseur.execute(requete)
        except Exception, erreur:
            pass
        #    was there a mistake?
        if erreur:
            #    one more mistake
            erreurs[0]+=1
            #  error msg
            msg="%s : Erreur (%s)" % (requete[0:len(requete)-1],erreur)
            erreurs.append(msg)
            #  screen tracking or not?
            if suivi:
                print msg
            #    shall we stop?
            if arret:
                return erreurs
        else:
            if suivi: 
                #  the query is displayed without its end-of-line marker
                print "%s : Execution reussie" % (cutNewLineChar(requete))
                #  information on the result of the query
                afficherInfos(curseur)
    #    locking connection
    try:
        connexion.commit()
        connexion.close()
    except MySQLdb.OperationalError,erreur:
        #    one more mistake
        erreurs[0]+=1
        #  error msg
        msg="%s : Erreur (%s)" % (requete,erreur)
        erreurs.append(msg)

    #    return
    return erreurs


#    ------------------------------------------------ main
#    user identity
USER="root"
PWD=""
#  the sgbd host machine
HOTE="localhost"
#    base identity
BASE="dbimpots"
#    data table identity
TABLE="impots"
#  the data file
IMPOTS="impots.txt"

#    layer instantiation [dao]
try:
    dao=ImpotsFile(IMPOTS)
except (IOError, ImpotsError) as infos:
    print ("Une erreur s'est produite : {0}".format(infos))
    sys.exit()

#  transfer the recovered data to a mysql table
erreurs=copyToMysql(dao.limites,dao.coeffR,dao.coeffN,HOTE,USER,PWD,BASE,TABLE)
if erreurs[0]:
    for i in range(1,len(erreurs)):
        print erreurs[i]
else:
    print "Transfert opere"
#    end
sys.exit()

注:

  • 第 106–110 行:我们实例化了第 8.1 节中介绍的 [ImpotsFile] 类;
  • 第 113 行:将数组 limitescoeffRcoeffN 传输到 MySQL 数据库;
  • 第 8 行:copyToMysql 函数执行此传输操作。函数会创建一个待执行的查询数组,并由第 25 行的 executerCommandes 函数执行这些查询;
  • 第 28–89 行: ,executerCommandes 函数即第 9.7 节中已介绍的函数,唯一区别在于:查询语句不再存储在文本文件中,而是保存在一个列表中;

结果

文本文件 impots.txt

12620:13190:15640:24740:31810:39970:48360:55790:92970:127860:151250:172040:195000:0
0:0.05:0.1:0.15:0.2:0.25:0.3:0.35:0.4:0.45:0.5:0.55:0.6:0.65
0:631:1290.5:272.5:3309.5:4900:6898.5:9316.5:12106:16754.5:23147.5:30710:39312:49062

屏幕显示结果:

Transfert opéré

使用 phpMyAdmin 进行验证:

 

10.2. 税务计算程序

既然计算税款所需的数据已存入数据库,我们就可以编写税款计算脚本了。我们再次采用三层架构:

新的 [dao] 层将连接到 MySQL 数据库管理系统,并由 [ImpotsMySQL] 类实现。它将为 [business] 层提供与之前相同的接口,该接口包含一个 getData 方法,该方法返回元组 (limits, coeffR, coeffN)。因此,[business] 层与上一版本相比将保持不变。

10.3. [ImpotsMySQL] 类

[dao]层现由以下[ImpotsMySQL]类实现(位于impots.py文件中):

class ImpotsMySQL:

    #    manufacturer
    def __init__(self,HOTE,USER,PWD,BASE,TABLE):
        #    initializes limit attributes, coeffR, coeffN
        #  the data required to calculate the tax has been placed in table mysqL TABLE
        #  belonging to the BASE database. The table has the following structure
        #    limits decimal(10,2), coeffR decimal(10,2), coeffN decimal(10,2)
        #  connection to the mysql database on machine HOTE is made as (USER,PWD)
        #  throws an exception if an error occurs

        #    connection to mysql database
        connexion=MySQLdb.connect(host=HOTE,user=USER,passwd=PWD,db=BASE)
        #  a cursor is requested
        curseur=connexion.cursor()

        #    block reading of table TABLE
        requete="select limites,coeffR,coeffN from %s" % (TABLE)
        #  executes the query [requete] on the base [base] of the connection [connexion]
        curseur.execute(requete)
        #    query result evaluation
        ligne=curseur.fetchone()
        self.limites=[]
        self.coeffR=[]
        self.coeffN=[]
        while(ligne):
            #  current line
            self.limites.append(ligne[0])
            self.coeffR.append(ligne[1])
            self.coeffN.append(ligne[2])
            #  next line
            ligne=curseur.fetchone()
        #    disconnect
        connexion.close()

    def getData(self):
        return (self.limites, self.coeffR, self.coeffN)

注:

  • 第 18 行:用于从 MySQL 数据库检索数据的 SELECT SQL 查询。随后,通过 [cursor.fetchone](第 22 行和第 32 行)逐行处理 SELECT 查询返回的结果行,以创建 limitscoeffRcoeffN 数组(第 28–30 行);
  • [dao] 层接口的 getData 方法。

10.4. 控制台脚本

控制台脚本代码(impots_04)如下:

#    -*- coding=utf-8 -*-

#    import of Impots* class module
from impots import *

#    ------------------------------------------------ main
#    user identity
USER="root"
PWD=""
#  the sgbd host machine
HOTE="localhost"
#    base identity
BASE="dbimpots"
#    data table identity
TABLE="impots"
#    input file
DATA="data.txt"
#    output file
RESULTATS="resultats.txt"

#    instantiation layer [metier]
try:
    metier=ImpotsMetier(ImpotsMySQL(HOTE,USER,PWD,BASE,TABLE))
except (IOError, ImpotsError) as infos:
    print ("Une erreur s'est produite : {0}".format(infos))
    sys.exit()

#  the data required to calculate the tax has been placed in the IMPOTS file
#  one line per table in the form
#    val1:val2:val3,...

#    reading data
try:
    data=open(DATA,"r")
except:
    print "Impossible d'ouvrir en lecture le fichier des donnees [DATA]"
    sys.exit()

#    open results file
try:
    resultats=open(RESULTATS,"w")
except:  
    print "Impossible de creer le fichier des résultats [RESULTATS]"
    sys.exit()

#    utilities
u=Utilitaires()

#  use the current line of the data file
ligne=data.readline()
while(ligne != ''):
    #  remove any end-of-line marker
    ligne=u.cutNewLineChar(ligne)
    #  we retrieve the 3 fields married:children:salary which form the line
    (marie,enfants,salaire)=ligne.split(",")
    enfants=int(enfants)
    salaire=int(salaire)
    #  tax calculation
    impot=metier.calculer(marie,enfants,salaire)
    #  enter the result
    resultats.write("{0}:{1}:{2}:{3}\n".format(marie,enfants,salaire,impot))
    #  a new line is read
    ligne=data.readline()
#    close files
data.close()
resultats.close()

注释:

  • 第 23 行:实例化 [dao] 和 [business] 层;
  • 其余代码大家都很熟悉。

结果

与本练习的前几个版本相同。