Skip to content

24. Practice Exercise: Version 7

24.1. Introduction

Version 7 of the tax calculation application is identical to version 6 except for the following details:

  • the web client will send multiple HTTP requests simultaneously. In the previous version, these requests were sent sequentially. The server could therefore process only a single request at a time;
  • the server will be multi-threaded: it will be able to process multiple requests simultaneously;
  • To track the execution of these requests, the web server will be equipped with a logger that will record key moments in the request processing in a text file;
  • the server will send an email to the application administrator when it encounters a problem that prevents it from starting, typically an issue with the database associated with the web server;

The application architecture remains unchanged:

Image

The directory structure of the scripts is as follows:

Image

The [http-servers/02] folder is first created by copying the [http-servers/01] folder. Modifications are then made to it.

24.2. The utilities

Image

24.2.1. The [Logger] class

The [Logger] class will allow certain web server actions to be logged to a text file:


import codecs
import threading
from datetime import date, datetime
from threading import current_thread

from ImpôtsError import ImpôtsError


class Logger:
    # class attribute
    lock = threading.RLock()

    # constructor
    def __init__(self, logs_filename: str):
        try:
            # open the file in append mode (a)
            self.__resource = codecs.open(logs_filename, "a", "utf-8")
        except BaseException as error:
            raise ImpôtsError(18, f"{error}")

    # Write a log
    def write(self, message: str):
        # current date and time
        today = date.today()
        now = datetime.time(datetime.now())
        # thread name
        thread_name = current_thread().name
        # we don't want to be interrupted while writing to the log file
        # we request the synchronization object (= the lock) from the class—only one thread will obtain it
        Logger.lock.acquire()
        try:
            # writing the log
            self.__resource.write(f"{today} {now}, {thread_name} : {message}")
            # Write immediately—otherwise, the text will only be written when the write stream is closed
            # but we want to track the logs over time
            self.__resource.flush()
        finally:
            # release the synchronization object (= the lock) so that another thread can acquire it
            Logger.lock.release()

    # release resources
    def close(self):
        # close the file
        if self.__resource:
            self.__resource.close()
  • Lines 10–11: We define a class attribute. A class attribute is a property shared by all instances of the class. It is referenced using the notation [Class.class_attribute] (lines 30, 39). The class attribute [lock] will serve as a synchronization object for all threads executing the code in lines 31–36;
  • lines 14–19: The constructor receives the absolute path of the log file. This file is then opened, and the file descriptor retrieved is stored in the class;
  • line 17: the log file is opened in ‘append’ mode (a). Each line written will be appended to the end of the file;
  • lines 22–39: the [write] method allows a message passed as a parameter to be written to the log file. Two pieces of information are appended to this message:
    • line 24: the current date;
    • line 25: the current time;
    • line 27: the name of the thread writing the log. It is important to remember here that a web application serves multiple users simultaneously. Each request is assigned a thread to execute it. If this thread is paused—typically for an I/O operation (network, files, database)—then the processor is handed over to another thread. Because of these possible interruptions, we cannot be sure that a thread will succeed in writing a line to the log file without being interrupted. There is therefore a risk that logs from two different threads might get mixed up. The risk is low, perhaps even zero, but we have nevertheless decided to show how to synchronize two threads’ access to a shared resource, in this case the log file;
  • line 30: before writing, the thread requests the key to the entry door. The requested key is the one created on line 11. It is indeed unique: a class attribute is unique for all instances of the class;
    • At time T1, a thread named Thread1 obtains the key. It can then execute line 33;
    • At time T2, thread Thread1 is paused before it has even finished writing the log;
    • At time T3, thread Thread2, which has acquired the processor, must also write a log. It thus reaches line 30, where it requests the front door key. It is told that another thread already has it. It is then automatically paused. This will be the case for all threads that request this key;
    • At time T4, thread Thread1, which had been paused, regains the processor. It then finishes writing the log;
  • Lines 32–36: Writing to the log file occurs in two steps:
  • line 33: the file descriptor obtained on line 17 works with a buffer. The [write] operation on line 33 writes to this buffer but not directly to the file. The buffer is then flushed to the file under certain conditions:
        • the buffer is full;
        • the file descriptor undergoes a [close] or [flush] operation;
  • line 36: we force the log line to be written to the file. We do this because we want to see the logs from the different threads interleaved. If we don’t do this, the logs from a single thread will all be written at the same time— —when the descriptor is closed on line 45. It would then be much harder to see that certain threads have been stopped: we would have to check the timestamps in the logs;
  • line 39: the Thread1 thread returns the lock that was given to it. It can now be given to another thread;
  • line 22: the [write] method is therefore synchronized: only one thread at a time writes to the log file. The key to the mechanism is line 30: no matter what happens, only one thread retrieves the key to proceed to the next line. It keeps it until it returns it (line 39);
  • lines 41–45: the [close] method releases the resources allocated to the log file descriptor;

The logs written to the log file will look like this:

2020-07-22 20:03:52.992152, Thread-2: …

24.2.2. The [SendAdminMail] class

The [SendAdminMail] class allows you to send a message to the application administrator when the application crashes.

Image

The [SendAdminMail] class is configured in the [config] script [2] as follows:


        # SMTP server configuration
        "adminMail": {
            # SMTP server
            "smtp-server": "localhost",
            # SMTP server port
            "smtp-port": "25",
            # administrator
            "from": "guest@localhost.com",
            "to": "guest@localhost.com",
            # email subject
            "subject": "Tax calculation server crash",
            # Set TLS to True if the SMTP server requires authentication, False otherwise
            "tls": False
        }

The [SendAdminMail] class receives the dictionary from lines 2–13 as well as the email sending configuration. The class is as follows:


# imports
import smtplib
from email.mime.text import MIMEText
from email.utils import formatdate


class SendAdminMail:

    # -----------------------------------------------------------------------
    @staticmethod
    def send(config: dict, message: str, verbose: bool = False):
        # sends message to the SMTP server config['smtp-server'] on port config[smtp-port]
        # if config['tls'] is true, TLS support will be used
        # the email is sent from config['from']
        # to the recipient config['to']
        # the message has the subject config['subject']
        # the logger reference is found in config['logger']

        # Retrieve the logger from the configuration—may be None
        logger = config["logger"]
        # SMTP server
        server = None
        # Send the message
        try:
            # SMTP server
            server = smtplib.SMTP(config["smtp-server"])
            # verbose mode
            server.set_debuglevel(verbose)
            # secure connection?
            if config['tls']:
                # start security negotiation
                server.starttls()
                # authentication
                server.login(config["user"], config["password"])
            # Construct a Multipart message—this is the message that will be sent
            msg = MIMEText(message)
            msg['From'] = config["from"]
            msg['To'] = config["to"]
            msg['Date'] = formatdate(localtime=True)
            msg['Subject'] = config["subject"]
            # send the message
            server.send_message(msg)
            # log - the logger may not exist
            if logger:
                logger.write(f"[SendAdminMail] Message sent to [{config['to']}]: [{message}]\n")
        except BaseException as error:
            # log - the logger may not exist
            if logger:
                logger.write(
                    f"[SendAdminMail] Error [{error}] while sending message [{message}] to [{config['to']}]: \n")
        finally:
            # we're done - we free the resources used by the function
            if server:
                server.quit()
  • lines 24-54: this is the code already covered in example |smtp/02|;
  • line 20: we retrieve the reference of a logger. This is used on lines 45 and 49;

24.3. The web server

24.3.1. Configuration

The server configuration is very similar to that of the server discussed previously. Only the [config.py] file has changed slightly:


def configure(config: dict) -> dict:
    import os

    # step 1 ------

    # directory of this file
    script_dir = os.path.dirname(os.path.abspath(__file__))

    # root directory
    root_dir = "C:/Data/st-2020/dev/python/cours-2020/python3-flask-2020"

    # absolute dependencies
    absolute_dependencies = [
        # project directories
        # BaseEntity, MyException
        f"{root_dir}/classes/02/entities",
        # TaxDaoInterface, TaxBusinessInterface, TaxUiInterface
        f"{root_dir}/taxes/v04/interfaces",
        # AbstractTaxDao, TaxConsole, TaxBusiness
        f"{root_dir}/taxes/v04/services",
        # TaxDaoWithAdminDataInDatabase
        f"{root_dir}/taxes/v05/services",
        # AdminData, TaxError, TaxPayer
        f"{root_dir}/impots/v04/entities",
        # Constants, tax brackets
        f"{root_dir}/taxes/v05/entities",
        # IndexController
        f"{root_dir}/impots/http-servers/01/controllers",
        # scripts [config_database, config_layers]
        script_dir,
        # Logger, SendAdminMail
        f"{script_dir}/../utilities",
    ]
    # Set the syspath
    from myutils import set_syspath
    set_syspath(absolute_dependencies)

    # Step 2 ------
    # application configuration
    config.update({
        # users authorized to use the application
        "users": [
            {
                "login": "admin",
                "password": "admin"
            }
        ],
        # log file
        "logsFilename": f"{script_dir}/../data/logs/logs.txt",
        # SMTP server configuration
        "adminMail": {
            # SMTP server
            "smtp-server": "localhost",
            # SMTP server port
            "smtp-port": "25",
            # administrator
            "from": "guest@localhost.com",
            "to": "guest@localhost.com",
            # email subject
            "subject": "Tax calculation server crash",
            # Set TLS to True if the SMTP server requires authentication, False otherwise
            "tls": False
        },
        # thread pause duration in seconds
        "sleep_time": 0
    })

    # Step 3 ------
    # database configuration
    import config_database
    config["database"] = config_database.configure(config)

    # step 4 ------
    # instantiating the application layers
    import config_layers
    config['layers'] = config_layers.configure(config)

    # we return the configuration
    return config
  • lines 40–66: We add elements related to the logger (line 49) and those related to sending an alert email to the application administrator (lines 51–63) to the server’s configuration dictionary;
  • line 65: to better see the threads in action, we will force some of them to pause. [sleep_time] is the duration of the pause expressed in seconds;
  • lines 27–28: Note that we are using the [index_controller] from the previous version 6;

24.3.2. The main script [main]

The main script [main] is as follows:


# Expects a mysql or pgres parameter
import sys
syntax = f"{sys.argv[0]} mysql / pgres"
error = len(sys.argv) != 2
if not error:
    dbms = sys.argv[1].lower()
    error = dbsystem != "mysql" and dbsystem != "pgres"
if error:
    print(f"syntax: {syntax}")
    sys.exit()

# configure the application
import config
config = config.configure({'db': db})

# dependencies
from flask import request, Flask
from flask_httpauth import HTTPBasicAuth
import json
import index_controller
from flask_api import status
from SendAdminMail import SendAdminMail
from myutils import json_response
from Logger import Logger
import threading
import time
from random import randint
from TaxError import TaxError

# authentication handler
auth = HTTPBasicAuth()


@auth.verify_password
def verify_password(login, password):
    # list of users
    users = config['users']
    # Iterate through this list
    for user in users:
        if user['login'] == login and user['password'] == password:
            return True
    # no match found
    return False


# Send an email to the administrator
def send_adminmail(config: dict, message: str):
    # send an email to the application administrator
    config_mail = config["adminMail"]
    config_mail["logger"] = config['logger']
    SendAdminMail.send(config_mail, message)


# check the log file
logger = None
error = False
error_message = None
try:
    # Logger
    logger = Logger(config["logsFilename"])
except BaseException as exception:
    # console log
    print(f"The following error occurred: {exception}")
    # log the error
    error = True
    error_message = f"{exception}"
# store the logger in the config
config['logger'] = logger
# error handling
if error:
    # email to the administrator
    send_adminmail(config, error_message)
    # end of application
    sys.exit(1)

# startup log
log = "[server] server startup"
logger.write(f"{log}\n")
print(log)

# Retrieving data from the tax authority
error = False
try:
    # admindata will be a read-only application-scope data
    config["admindata"] = config["layers"]["dao"].get_admindata()
    # success log
    logger.write("[server] database connection successful\n")
except ImpôtsError as ex:
    # log the error
    error = True
    # error log
    log = f"The following error occurred: {ex}"
    # console
    print(log)
    # log file
    logger.write(f"{log}\n")
    # email to the administrator
    send_adminmail(config, log)

# the main thread no longer needs the logger
logger.close()

# if there was an error, stop
if error:
    sys.exit(2)

# The Flask application can start
app = Flask(__name__)


# Home URL
@app.route('/', methods=['GET'])
@auth.login_required
def index():



# main only
if __name__ == '__main__':
    # start the server
    app.config.update(ENV="development", DEBUG=True)
    app.run(threaded=True)
  • lines 1-10: the script expects a parameter [mysql / pgres] that specifies the DBMS to use;
  • lines 12–14: the application is configured (Python Path, layers, database);
  • lines 16–28: dependencies required by the application;
  • lines 30-43: authentication management;
  • lines 46–51: a function that sends an email to the application administrator;
  • the function expects two parameters:
      • config: a dictionary with the keys [adminMail] and [logger];
      • the message to be sent;
    • lines 49–50: we prepare the email configuration;
    • we send the email;
  • lines 54–74: we check for the presence of the log file;
  • lines 70–74: if we were unable to open the log file, we send an email to the administrator and exit;
  • lines 76–79: log the server startup;
  • lines 81–98: retrieve tax administration data from the database;
  • lines 88–98: if we were unable to retrieve this data, we log the error both on the console and in the log file;
  • lines 100–101: The main thread will no longer log (the created threads will not use the same file descriptor);
  • lines 103–105: if we were unable to connect to the database, we stop;
  • line 122: the server is started in multithreaded mode;

The [index] function (line 114) is as follows:


# Home URL
@app.route('/', methods=['GET'])
@auth.login_required
def index():
    logger = None
    try:
        # logger
        logger = Logger(config["logsFilename"])
        # store it in a thread-specific configuration
        thread_config = {"logger": logger}
        thread_name = threading.current_thread().name
        config[thread_name] = {"config": thread_config}
        # log the request
        logger.write(f"[index] request: {request}\n")
        # pause the thread if requested
        sleep_time = config["sleep_time"]
        if sleep_time != 0:
            # The pause is random so that some threads are interrupted and others are not
            random = randint(0, 1)
            if random = 1:
                # log before pause
                logger.write(f"[index] thread paused for {sleep_time} second(s)\n")
                # pause
                time.sleep(sleep_time)
        # have the request executed by a controller
        result, status_code = index_controller.execute(request, config)
        # Was there a fatal error?
        if status_code == status.HTTP_500_INTERNAL_SERVER_ERROR:
            # send an email to the application administrator
            config_mail = config["adminMail"]
            config_mail["logger"] = logger
            SendAdminMail.send(config_mail, json.dumps(result, ensure_ascii=False))
        # log the response
        logger.write(f"[index] {result}\n")
        # send the response
        return json_response(result, status_code)
    except BaseException as error:
        # log the error if possible
        if logger:
            logger.write(f"[index] {error}")
        # prepare the response for the client
        result = {"response": {"errors": [f"{error}"]}}
        # send the response
        return json_response(result, status.HTTP_500_INTERNAL_SERVER_ERROR)
    finally:
        # Close the log file if it was opened
        if logger:
            logger.close()
  • Line 4: the function executed when a user requests the URL /. Because the server is multi-threaded (line 112), a thread will be created to execute the function. This thread can be interrupted and paused at any time to resume execution a little later. Always keep this in mind when the code accesses a resource shared by all threads. In this case, that resource is the log file: all threads write to it;
  • line 8: we create an instance of the logger. Thus, all threads will have a different instance of the logger. However, all these loggers point to the same log file. It is still important to note that when a thread closes its logger, this has no effect on the loggers of the other threads;
  • Lines 9–12: We store the logger in the application’s [config] dictionary under a key named after the thread. Thus, if there are n threads running simultaneously, n entries will be created in the [config] dictionary. [config] is a resource shared among all threads. Therefore, synchronization may be required. I have made an assumption here. I assumed that if two threads simultaneously created their entries in the [config] file and one of them was interrupted by the other, this would have no impact. The interrupted thread could later complete the creation of the entry. If testing showed this assumption to be false, access to line 12 would need to be synchronized;
  • line 10: we put the logger in a dictionary;
  • line 11: [threading.current_thread()] is the thread executing this line, and thus the thread executing the [index] function. We record its name. Each thread has a unique name;
  • line 12: we store the thread’s configuration. From now on, we will always proceed as follows: if there is information that cannot be shared between threads, it will still be placed in the general configuration, but associated with the thread’s name;
  • line 14: we log the request we are currently executing;
  • lines 15–24: we randomly pause certain threads so that they yield the processor to another thread;
    • line 16: we retrieve the pause duration (in seconds) from the configuration;
    • line 17: a pause occurs only if the pause duration is not 0;
    • line 19: a random integer in the range [0, 1]. Therefore, only the values 0 and 1 are possible;
    • line 20: the thread is paused only if the random number is 1;
    • line 22: we log the fact that the thread is about to be paused;
    • line 24: the thread is paused for [sleep_time] seconds;
  • line 26: when the thread wakes up, it has the [index_controller] module execute the request;
  • lines 28–32: if this execution causes a [500 INTERNAL SERVER ERROR], an email is sent to the administrator;
    • lines 30-31: we configure the [config_mail] dictionary that we will pass to the [SendAdminMail] class;
    • line 32: the message sent to the administrator is the JSON string of the result that will be sent to the client;
  • lines 33–34: we log the response that will be sent to the client (line 36);
  • lines 37–44: handle any exceptions;
  • lines 39–40: if the logger exists, we log the error that occurred;
  • lines 47–48: we close the logger if it exists. Ultimately, the thread creates a logger at the start of the request and closes it once the request has been processed;

24.3.3. The controller [index_controller]

The controller [index_controller] that executes the requests is the one from the previous version:

Image

24.3.4. Execution

We start the Flask server, the email server |hMailServer|, and the email client |Thunderbird|. We do not start the DBMS. The server stops with the following console logs:


C:\Data\st-2020\dev\python\cours-2020\python3-flask-2020\venv\Scripts\python.exe C:/Data/st-2020/dev/python/cours-2020/python3-flask-2020/impots/http-servers/02/flask/main.py mysql
[server] starting server
The following error occurred: MyException[27, (mysql.connector.errors.InterfaceError) 2003: Can't connect to MySQL server on 'localhost:3306' (10061 No connection could be established because the target computer explicitly refused it)
(Background on this error at: http://sqlalche.me/e/13/rvf5)]

Process finished with exit code 2 

The log file [logs.txt] is as follows:


2020-07-23 11:51:38.324752, MainThread: [server] starting the server
2020-07-23 11:51:40.355510, MainThread: The following error occurred: MyException[27, (mysql.connector.errors.InterfaceError) 2003: Can't connect to MySQL server on 'localhost:3306' (10061 No connection could be established because the target computer explicitly refused it)
(Background on this error at: http://sqlalche.me/e/13/rvf5)]
2020-07-23 11:51:42.464206, MainThread: [SendAdminMail] Message sent to [guest@localhost.com]: [The following error occurred: MyException[27, (mysql.connector.errors.InterfaceError) 2003: Can't connect to MySQL server on 'localhost:3306' (10061 No connection could be established because the target computer explicitly refused it)
(Background on this error at: http://sqlalche.me/e/13/rvf5)]]

Using Thunderbird, check the administrator's emails [guest@localhost.com]:

Image

Then start the DBMS and request the URL [http://127.0.0.1:5000/?mari%C3%A9=oui&enfants=3&salaire=200000]. The logs become as follows:


2020-07-23 11:56:38.891753, MainThread: [server] server startup
2020-07-23 11:56:38.987999, MainThread: [server] successful connection to the database
2020-07-23 11:56:40.586747, MainThread: [server] server startup
2020-07-23 11:56:40.655254, MainThread: [server] successful database connection
2020-07-23 11:56:54.528360, Thread-2: [index] request: <Request 'http://127.0.0.1:5000/?marié=oui&enfants=3&salaire=200000' [GET]>
2020-07-23 11:56:54.530653, Thread-2: [index] {'response': {'result': {'married': 'yes', 'children': 3, 'salary': 200000, 'tax': 42842, 'surcharge': 17283, 'rate': 0.41, 'discount': 0, 'reduction': 0}}}
  • lines 1-4: note that the server starts twice because the [Debug=True] mode triggers a second startup;
  • lines 5-6: the logs give us an idea of the execution time of a request, here 2.293 milliseconds;

24.4. The web client

Image

The [http-clients/02] directory is created by copying the [http-clients/01] directory. We then make a few modifications.

24.4.1. The configuration

The [config] configuration of the [http-clients/02] application is the same as that of the [http-clients/01] application, with a few minor differences:


def configure(config: dict) -> dict:
    import os

    # step 1 ------

    # directory of this file
    script_dir = os.path.dirname(os.path.abspath(__file__))

    # root directory
    root_dir = "C:/Data/st-2020/dev/python/cours-2020/python3-flask-2020"

    # absolute dependencies
    absolute_dependencies = [
        # project folders
        # BaseEntity, MyException
        f"{root_dir}/classes/02/entities",
        # TaxDaoInterface, TaxBusinessInterface, TaxUiInterface
        f"{root_dir}/taxes/v04/interfaces",
        # AbstractTaxDao, TaxConsole, TaxBusiness
        f"{root_dir}/taxes/v04/services",
        # TaxDaoWithAdminDataInDatabase
        f"{root_dir}/taxes/v05/services",
        # AdminData, TaxError, Taxpayer
        f"{root_dir}/impots/v04/entities",
        # Constants, tax brackets
        f"{root_dir}/taxes/v05/entities",
        # TaxDaoWithHttpClient
        f"{script_dir}/../services",
        # configuration scripts
        script_dir,
        # Logger
        f"{root_dir}/impots/http-servers/02/utilities",
    ]

    # Set the syspath
    from myutils import set_syspath
    set_syspath(absolute_dependencies)

    # step 2 ------
    # Configure the application with constants
    config.update({
        # taxpayers file
        "taxpayersFilename": f"{script_dir}/../data/input/taxpayersdata.txt",
        # results file
        "resultsFilename": f"{script_dir}/../data/output/results.json",
        # error file
        "errorsFilename": f"{script_dir}/../data/output/errors.txt",
        # log file
        "logsFilename": f"{script_dir}/../data/logs/logs.txt",
        # tax calculation server
        "server": {
            "urlServer": "http://127.0.0.1:5000/",
            "authBasic": True,
            "user": {
                "login": "admin",
                "password": "admin"
            }
        },
        # debug mode
        "debug": True
    }
    )

    # step 3 ------
    # instantiating layers
    import config_layers
    config['layers'] = config_layers.configure(config)

    # return the configuration
    return config
  • lines 31-32: we will use the same logger |Logger| as the one used for the server;
  • line 49: the absolute path to the log file;
  • line 60: the [debug=True] mode is used to write the web server's responses to the log file;

24.4.2. The [dao] layer

The code for the [ImpôtsDaoWithHttpClient] class changes slightly:


# imports

import requests
from flask_api import status




class TaxDaoWithHttpClient(AbstractTaxDao, BusinessTaxInterface):

    # constructor
    def __init__(self, config: dict):
        # parent initialization
        AbstractTaxDao.__init__(self, config)
        # store configuration elements
        # General configuration
        self.__config = config
        # server
        self.__config_server = config["server"]
        # debug mode
        self.__debug = config["debug"]
        # logger
        self.__logger = None

    # unused method
    def get_admindata(self) -> AdminData:
        pass

    # tax calculation
    def calculate_tax(self, taxpayer: TaxPayer, admindata: AdminData = None):
        # let exceptions propagate
        
        # debug mode?
        if self.__debug:
            # logger
            if not self.__logger:
                self.__logger = self.__config['logger']
            # logging
            self.__logger.write(f"{response.text}\n")
        # HTTP response status code
        status_code = response.status_code
        
  • Line 17: We store the general configuration. We will see later that when the constructor of the [ImpôtsDaoWithHttpClient] class runs, the [config] dictionary does not yet contain the [logger] key used on line 37. This is why we cannot initialize [self.__logger] (line 23) in the constructor;
  • line 21: we have added a [debug] key to the configuration that controls the logging in lines 33–39;
  • line 34: if we are in [debug] mode;
  • lines 36–37: optional initialization of the [self.__logger] property. When the [calculate_tax] method is used, the [logger] key is part of the [config] dictionary;
  • line 39: we log the text document associated with the server’s HTTP response;

The [dao] layer will be executed simultaneously by multiple threads. However, here we create a single instance of this layer (see config_layers). We must therefore verify that the code does not involve write access to shared data, typically the properties of the [ImpôtsDaoWithHttpClient] class that implements the [dao] layer. However, in the code above, line 37 modifies a property of the class instance. Here, this has no consequences because all threads share the same logger. If this had not been the case, access to line 37 would have had to be synchronized.

24.4.3. The main script

The main script [main] evolves as follows:


# configure the application

import config
config = config.configure({})

# dependencies
from ImpôtsError import ImpôtsError
import random
import sys
import threading
from Logger import Logger


# executing the [dao] layer in a thread
# taxpayers is a list of taxpayers
def thread_function(dao, logger, taxpayers: list):
    


# list of client threads
threads = []
logger = None
# code
try:
    # logger
    logger = Logger(config["logsFilename"])
    # store it in the config
    config["logger"] = logger
    # retrieve the [dao] layer
    dao = config["layers"]["dao"]
    # read taxpayer data
    taxpayers = dao.get_taxpayers_data()["taxpayers"]
    # Are there any taxpayers?
    if not taxpayers:
        raise TaxError(36, f"No valid taxpayers in the file {config['taxpayersFilename']}")
    # Calculate taxpayers' taxes using multiple threads
    i = 0
    l_taxpayers = len(taxpayers)
    while i < len(taxpayers):
        # each thread will process 1 to 4 taxpayers
        nb_taxpayers = min(l_taxpayers - i, random.randint(1, 4))
        # the list of taxpayers processed by the thread
        thread_taxpayers = taxpayers[slice(i, i + nb_taxpayers)]
        # Increment i for the next thread
        i += nb_taxpayers
        # create the thread
        thread = threading.Thread(target=thread_function, args=(dao, logger, thread_taxpayers))
        # add it to the list of threads in the main script
        threads.append(thread)
        # Start the thread—this operation is asynchronous—we don't wait for the thread's result
        thread.start()
    # The main thread waits for all the threads it has started to finish
    for thread in threads:
        thread.join()
    # Here, all threads have finished their work—each has modified one or more [taxpayer] objects
    # we save the results to the JSON file
    dao.write_taxpayers_results(taxpayers)
except BaseException as error:
    # display the error
    print(f"The following error occurred: {error}")
finally:
    # close the logger
    if logger:
        logger.close()
    # we're done
    print("Work completed...")
    # Terminate any threads that might still be running if we stopped due to an error
    sys.exit()
  • The main script differs from that of the previous client in that it will generate multiple execution threads to send requests to the server. The client in version 6 sent all its requests sequentially. Request #i was only made once the response to request #[i-1] was received. Here, we want to see how the server behaves when it receives multiple simultaneous requests. For this, we need threads;
  • line 21: the generated threads will be placed in a list. It is important to understand that the [main] script is also executed by a thread called [MainThread]. This main thread will create other threads that will be responsible for calculating the tax for one or more taxpayers;
  • line 26: we create a logger. This will be shared by all threads;
  • line 32: we retrieve all taxpayers whose taxes need to be calculated;
  • lines 39–51: we distribute these taxpayers across several threads;
  • lines 40–41: each thread will process 1 to 4 taxpayers. This number is determined randomly;
    • [random.randint(1, 4)] randomly generates a number from the list [1, 2, 3, 4];
    • the thread cannot have more than [l-i] taxpayers, where [l-i] represents the number of taxpayers who have not yet been assigned a thread;
    • we therefore take the minimum of the two values;
  • line 43: once [nb_taxpayers], the number of taxpayers processed by the thread, is known, we take these from the list of taxpayers:
    • [slice(10,12)] is the set of indices [10, 11, 12];
    • [taxpayers[slice(10,12)]] is the list [taxpayers[10], taxpayers[11], taxpayers[12] ;
  • line 45: we increment the value of i, which controls the loop on line 39;
  • line 47: we create a thread:
    • [target=thread_function] sets the function that the thread will execute. This is the function from lines 16–17. It expects three parameters;
    • [ags] is the list of the three parameters expected by the [thread_function] function;

Creating a thread does not execute it. It simply creates an object;

  • Lines 48–49: The thread that has just been created is added to the list of threads created by the main thread;
  • line 51: the thread is launched. It will then run in parallel with the other active threads. Here, it will execute the [thread_function] with the arguments provided to it;
  • lines 53–54: the main thread waits for each of the threads it has launched. Let’s take an example:
    • the main thread has launched three threads [th1, th2, th3];
    • The main thread waits for each of the threads (lines 53–54) in the order of the for loop: [th1, th2, th3];
    • Suppose the threads finish in the order [th2, th1, th3];
    • The main thread waits for th1 to finish. When th2 finishes, nothing happens;
    • When th1 finishes, the main thread waits for th2. However, th2 has already finished. The main thread then moves on to the next thread and waits for th3;
    • when th3 finishes, the main thread has finished waiting and proceeds to execute line 57;
  • line 57 writes the results to the results file. This is a good example of object references:
    • line 43: the list [thread_payers] associated with a thread contains copies of the object references contained in the list [taxpayers];
    • we know that the tax calculation will modify the objects pointed to by the references in the [thread_payers] list. These objects will be updated with the results of the tax calculation. However, the references themselves are not modified. Therefore, the references in the initial [taxpayers] list “see” or “point to” the modified objects;

The [thread_function] executed by the threads is as follows:


# execution of the [dao] layer in a thread
# taxpayers is a list of taxpayers
def thread_function(dao, logger, taxpayers: list):
    # log start of thread
    thread_name = threading.current_thread().name
    logger.write(f"Start of thread [{thread_name}] with {len(taxpayers)} taxpayer(s)\n")
    # calculate the tax for the taxpayers
    for taxpayer in taxpayers:
        # log
        logger.write(f"starting to calculate {taxpayer}'s tax\n")
        # synchronous tax calculation
        dao.calculate_tax(taxpayer)
        # log
        logger.write(f"End of tax calculation for {taxpayer}\n")
    # log end of thread
    logger.write(f"End of thread [{thread_name}]\n")
  • Functions executed simultaneously by multiple threads are often tricky to write: you must always verify that the code does not attempt to modify data shared between threads. When this occurs, you must implement synchronized access to the shared data that is about to be modified;
  • Line 3: The function receives three parameters:
    • [dao]: a reference to the [dao] layer. This data is shared;
    • [logger]: a reference to the logger. This data is shared;
    • [taxpayers]: a list of taxpayers. This data is not shared: each thread manages a different list;
  • Let’s examine the two references [dao, logger]:
    • we saw that the object pointed to by the [dao] reference had a [self.__logger] reference that was modified by the threads, but to set a value common to all threads;
    • the [logger] reference points to a file descriptor. We saw that there could be a problem when writing logs to the file. For this reason, writing to the file has been synchronized;
  • lines 5–6: we log the thread’s name and the number of taxpayers it must manage;
  • lines 8–14: calculation of taxpayers' taxes;
  • line 16: log the end of the thread;

24.4.4. Execution

Let’s start the web server as described in the previous section (web server, DBMS, hMailServer, Thunderbird), then run the client’s [main] script. In the files [data/output/errors.txt, data/output/results.json], we get the same results as in the previous version. In the file [data/logs/logs.txt], we have the following logs:


2020-07-24 10:05:20.942404, Thread-1: start of thread [Thread-1] with 1 taxpayer(s)
2020-07-24 10:05:20.943458, Thread-1: Start of tax calculation for {"id": 1, "married": "yes", "children": 2, "salary": 55555}
2020-07-24 10:05:20.943458, Thread-2: Start of thread [Thread-2] with 3 taxpayer(s)
2020-07-24 10:05:20.946502, Thread-3: Start of thread [Thread-3] with 1 taxpayer
2020-07-24 10:05:20.946502, Thread-2: Start of tax calculation for {"id": 2, "married": "yes", "children": 2, "salary": 50000}
2020-07-24 10:05:20.947003, Thread-3: Start of tax calculation for {"id": 5, "married": "no", "children": 3, "salary": 100000}
2020-07-24 10:05:20.947003, Thread-4: start of thread [Thread-4] with 3 taxpayer(s)
2020-07-24 10:05:20.950324, Thread-4: Start of tax calculation for {"id": 6, "married": "yes", "children": 3, "salary": 100000}
2020-07-24 10:05:20.948449, Thread-5: Start of thread [Thread-5] with 3 taxpayers
2020-07-24 10:05:20.953645, Thread-5: Start of tax calculation for {"id": 9, "married": "yes", "children": 2, "salary": 30000}
2020-07-24 10:05:20.976143, Thread-1: {"response": {"result": {"married": "yes", "children": 2, "salary": 55555, "tax": 2814, "surcharge": 0, "rate": 0.14, "discount": 0, "reduction": 0}}}
2020-07-24 10:05:20.976695, Thread-1: End of tax calculation for {"id": 1, "married": "yes", "children": 2, "salary": 55555, "tax": 2814, "surcharge": 0, "rate": 0.14, "discount": 0, "reduction": 0}
2020-07-24 10:05:20.976695, Thread-1: end of thread [Thread-1]
2020-07-24 10:05:21.973914, Thread-2 : {"response": {"result": {"married": "yes", "children": 2, "salary": 50000, "tax": 1384, "surcharge": 0, "rate": 0.14, "discount": 384, "reduction": 347}}}
2020-07-24 10:05:21.973914, Thread-2: End of tax calculation for {"id": 2, "married": "yes", "children": 2, "salary": 50000, "tax": 1384, "surcharge": 0, "rate": 0.14, "discount": 384, "reduction": 347}
2020-07-24 10:05:21.973914, Thread-2: Start of tax calculation for {"id": 3, "married": "yes", "children": 3, "salary": 50000}
2020-07-24 10:05:21.977130, Thread-4: {"response": {"result": {"married": "yes", "children": 3, "salary": 100000, "tax": 9200, "surcharge": 2180, "rate": 0.3, "discount": 0, "reduction": 0}}}
2020-07-24 10:05:21.977130, Thread-4: End of tax calculation for {"id": 6, "married": "yes", "children": 3, "salary": 100000, "tax": 9200, "surcharge": 2180, "rate": 0.3, "discount": 0, "reduction": 0}
2020-07-24 10:05:21.977130, Thread-4: Start of tax calculation for {"id": 7, "married": "yes", "children": 5, "salary": 100000}
2020-07-24 10:05:21.982634, Thread-3: {"response": {"result": {"married": "no", "children": 3, "salary": 100000, "tax": 16782, "surcharge": 7176, "rate": 0.41, "discount": 0, "reduction": 0}}}
2020-07-24 10:05:21.982634, Thread-5: {"response": {"result": {"married": "yes", "children": 2, "salary": 30000, "tax": 0, "surcharge": 0, "rate": 0.0, "discount": 0, "reduction": 0}}}
2020-07-24 10:05:21.983134, Thread-3: end of tax calculation for {"id": 5, "married": "no", "children": 3, "salary": 100000, "tax": 16782, "surcharge": 7176, "rate": 0.41, "discount": 0, "reduction": 0}
2020-07-24 10:05:21.983134, Thread-5: End of tax calculation for {"id": 9, "married": "yes", "children": 2, "salary": 30000, "tax": 0, "surcharge": 0, "rate": 0.0, "discount": 0, "reduction": 0}
2020-07-24 10:05:21.983134, Thread-3: end of thread [Thread-3]
2020-07-24 10:05:21.983763, Thread-5: start of tax calculation for {"id": 10, "married": "no", "children": 0, "salary": 200000}
2020-07-24 10:05:22.008562, Thread-5: {"response": {"result": {"married": "no", "children": 0, "salary": 200000, "tax": 64210, "surcharge": 7498, "rate": 0.45, "discount": 0, "reduction": 0}}}
2020-07-24 10:05:22.008562, Thread-5: End of tax calculation for {"id": 10, "married": "no", "children": 0, "salary": 200000, "tax": 64210, "surcharge": 7498, "rate": 0.45, "discount": 0, "reduction": 0}
2020-07-24 10:05:22.009062, Thread-5: Start of tax calculation for {"id": 11, "married": "yes", "children": 3, "salary": 200000}
2020-07-24 10:05:22.016848, Thread-5: {"response": {"result": {"married": "yes", "children": 3, "salary": 200000, "tax": 42842, "surcharge": 17283, "rate": 0.41, "discount": 0, "reduction": 0}}}
2020-07-24 10:05:22.017349, Thread-5: End of tax calculation for {"id": 11, "married": "yes", "children": 3, "salary": 200000, "tax": 42842, "surcharge": 17283, "rate": 0.41, "discount": 0, "reduction": 0}
2020-07-24 10:05:22.017349, Thread-5: end of thread [Thread-5]
2020-07-24 10:05:23.008486, Thread-2 : {"response": {"result": {"married": "yes", "children": 3, "salary": 50000, "tax": 0, "surcharge": 0, "rate": 0.14, "discount": 720, "reduction": 0}}}
2020-07-24 10:05:23.008486, Thread-2: End of tax calculation for {"id": 3, "married": "yes", "children": 3, "salary": 50000, "tax": 0, "surcharge": 0, "rate": 0.14, "discount": 720, "reduction": 0}
2020-07-24 10:05:23.009749, Thread-2: start of tax calculation for {"id": 4, "married": "no", "children": 2, "salary": 100000}
2020-07-24 10:05:23.011722, Thread-4: {"response": {"result": {"married": "yes", "children": 5, "salary": 100000, "tax": 4230, "surcharge": 0, "rate": 0.14, "discount": 0, "reduction": 0}}}
2020-07-24 10:05:23.013723, Thread-4: End of tax calculation for {"id": 7, "married": "yes", "children": 5, "salary": 100000, "tax": 4230, "surcharge": 0, "rate": 0.14, "discount": 0, "reduction": 0}
2020-07-24 10:05:23.013723, Thread-4: Start of tax calculation for {"id": 8, "married": "no", "children": 0, "salary": 100000}
2020-07-24 10:05:23.024135, Thread-2: {"response": {"result": {"married": "no", "children": 2, "salary": 100000, "tax": 19884, "surcharge": 4480, "rate": 0.41, "discount": 0, "reduction": 0}}}
2020-07-24 10:05:23.024135, Thread-2: End of tax calculation for {"id": 4, "married": "no", "children": 2, "salary": 100000, "tax": 19884, "surcharge": 4480, "rate": 0.41, "discount": 0, "reduction": 0}
2020-07-24 10:05:23.025178, Thread-2: end of thread [Thread-2]
2020-07-24 10:05:23.025178, Thread-4 : {"response": {"result": {"married": "no", "children": 0, "salary": 100000, "tax": 22986, "surcharge": 0, "rate": 0.41, "discount": 0, "reduction": 0}}}
2020-07-24 10:05:23.026191, Thread-4: End of tax calculation for {"id": 8, "married": "no", "children": 0, "salary": 100000, "tax": 22986, "surcharge": 0, "rate": 0.41, "discount": 0, "reduction": 0}
2020-07-24 10:05:23.026191, Thread-4: End of thread [Thread-4]
  • These logs show that five threads were launched to calculate the taxes for 11 taxpayers. These five threads sent simultaneous requests to the tax calculation server. It’s important to understand how this works:
    • Thread [Thread-1] is launched first. When it has the CPU, it executes the code until it sends its HTTP request. Since it must wait for the result of this request, it is automatically put on hold. It then loses the CPU, and another thread takes it over;
    • lines 1–10: the same process repeats for each of the 5 threads. Thus, the 5 threads are launched before thread [Thread-1] has even received its response on line 11;
  • The threads do not finish in the order in which they were launched. Thus, thread [Thread-3] finishes first, line 23;

On the server side, the logs in the file [data/logs/logs.txt] are as follows:


2020-07-24 10:05:01.692980, MainThread: [server] server startup
2020-07-24 10:05:01.877251, MainThread: [server] successful database connection
2020-07-24 10:05:03.596162, MainThread: [server] server startup
2020-07-24 10:05:03.661160, MainThread: [server] successful database connection
2020-07-24 10:05:20.968053, Thread-2: [index] request: <Request 'http://127.0.0.1:5000/?marié=oui&enfants=2&salaire=50000' [GET]>
2020-07-24 10:05:20.969132, Thread-2: [index] Thread paused for 1 second
2020-07-24 10:05:20.970316, Thread-3: [index] request: <Request 'http://127.0.0.1:5000/?marié=oui&enfants=3&salaire=100000' [GET]>
2020-07-24 10:05:20.970316, Thread-3: [index] Thread paused for 1 second
2020-07-24 10:05:20.971335, Thread-4: [index] request: <Request 'http://127.0.0.1:5000/?marié=oui&enfants=2&salaire=55555' [GET]>
2020-07-24 10:05:20.972563, Thread-4: [index] {'response': {'result': {'married': 'yes', 'children': 2, 'salary': 55555, 'tax': 2814, 'surcharge': 0, 'rate': 0.14, 'discount': 0, 'reduction': 0}}}
2020-07-24 10:05:20.974796, Thread-5: [index] request: <Request 'http://127.0.0.1:5000/?marié=non&enfants=3&salaire=100000' [GET]>
2020-07-24 10:05:20.974796, Thread-5 : [index] thread paused for 1 second(s)
2020-07-24 10:05:20.976143, Thread-6: [index] request: <Request 'http://127.0.0.1:5000/?marié=oui&enfants=2&salaire=30000' [GET]>
2020-07-24 10:05:20.976143, Thread-6: [index] Thread paused for 1 second
2020-07-24 10:05:21.970615, Thread-2: [index] {'response': {'result': {'married': 'yes', 'children': 2, 'salary': 50000, 'tax': 1384, 'surcharge': 0, 'rate': 0.14, 'discount': 384, 'reduction': 347}}}
2020-07-24 10:05:21.973914, Thread-3: [index] {'response': {'result': {'married': 'yes', 'children': 3, 'salary': 100000, 'tax': 9200, 'surcharge': 2180, 'rate': 0.3, 'discount': 0, 'reduction': 0}}}
2020-07-24 10:05:21.977130, Thread-6: [index] {'response': {'result': {'married': 'yes', 'children': 2, 'salary': 30000, 'tax': 0, 'surcharge': 0, 'rate': 0.0, 'discount': 0, 'reduction': 0}}}
2020-07-24 10:05:21.977130, Thread-5 : [index] {'response': {'result': {'married': 'no', 'children': 3, 'salary': 100000, 'tax': 16782, 'surcharge': 7176, 'rate': 0.41, 'discount': 0, 'reduction': 0}}}
2020-07-24 10:05:22.001693, Thread-7: [index] request: <Request 'http://127.0.0.1:5000/?marié=oui&enfants=3&salaire=50000' [GET]>
2020-07-24 10:05:22.003013, Thread-7 : [index] thread paused for 1 second(s)
2020-07-24 10:05:22.003013, Thread-8: [index] request: <Request 'http://127.0.0.1:5000/?marié=oui&enfants=5&salaire=100000' [GET]>
2020-07-24 10:05:22.003013, Thread-8: [index] Thread paused for 1 second
2020-07-24 10:05:22.005871, Thread-9: [index] request: <Request 'http://127.0.0.1:5000/?marié=non&enfants=0&salaire=200000' [GET]>
2020-07-24 10:05:22.006370, Thread-9: [index] {'response': {'result': {'married': 'no', 'children': 0, 'salary': 200000, 'tax': 64210, 'surcharge': 7498, 'rate': 0.45, 'discount': 0, 'reduction': 0}}}
2020-07-24 10:05:22.014170, Thread-10: [index] request: <Request 'http://127.0.0.1:5000/?marié=oui&enfants=3&salaire=200000' [GET]>
2020-07-24 10:05:22.014170, Thread-10 : [index] {'response': {'result': {'married': 'yes', 'children': 3, 'salary': 200000, 'tax': 42842, 'surcharge': 17283, 'rate': 0.41, 'discount': 0, 'reduction': 0}}}
2020-07-24 10:05:23.003533, Thread-7: [index] {'response': {'result': {'married': 'yes', 'children': 3, 'salary': 50000, 'tax': 0, 'surcharge': 0, 'rate': 0.14, 'discount': 720, 'reduction': 0}}}
2020-07-24 10:05:23.006434, Thread-8: [index] {'response': {'result': {'married': 'yes', 'children': 5, 'salary': 100000, 'tax': 4230, 'surcharge': 0, 'rate': 0.14, 'discount': 0, 'reduction': 0}}}
2020-07-24 10:05:23.018026, Thread-11: [index] request: <Request 'http://127.0.0.1:5000/?marié=non&enfants=2&salaire=100000' [GET]>
2020-07-24 10:05:23.019074, Thread-11 : [index] {'response': {'result': {'married': 'no', 'children': 2, 'salary': 100000, 'tax': 19884, 'surcharge': 4480, 'rate': 0.41, 'discount': 0, 'reduction': 0}}}
2020-07-24 10:05:23.021447, Thread-12: [index] request: <Request 'http://127.0.0.1:5000/?marié=non&enfants=0&salaire=100000' [GET]>
2020-07-24 10:05:23.022447, Thread-12 : [index] {'response': {'result': {'married': 'no', 'children': 0, 'salary': 100000, 'tax': 22986, 'surcharge': 0, 'rate': 0.41, 'discount': 0, 'reduction': 0}}}
  • We can see that 11 threads processed the 11 taxpayers;
  • some threads were put on hold (lines 6, 8, 12, 14, 20, 22) and others were not (lines 9, 23, 25, 29, 31);

24.5. [DAO] Layer Tests

As we did in the |previous version|, we are testing the client’s [DAO] layer. The principle is exactly the same:

Image

The test class will be executed in the following environment:

Image

  • Configuration [2] is identical to configuration [1], which we just examined;

The test class [TestHttpClientDao] is as follows:


import unittest

from Logger import Logger


class TestHttpClientDao(unittest.TestCase):

    def test_1(self) -> None:
        from TaxPayer import TaxPayer

        # {'married': 'yes', 'children': 2, 'salary': 55555,
        # 'tax': 2814, 'surcharge': 0, 'discount': 0, 'reduction': 0, 'rate': 0.14}
        taxpayer = TaxPayer().fromdict({"married": "yes", "children": 2, "salary": 55555})
        dao.calculate_tax(taxpayer)
        # verification
        self.assertAlmostEqual(taxpayer.tax, 2815, delta=1)
        self.assertEqual(taxpayer.discount, 0)
        self.assertEqual(taxpayer.reduction, 0)
        self.assertAlmostEqual(taxpayer.rate, 0.14, delta=0.01)
        self.assertEqual(taxpayer.surcharge, 0)

    


if __name__ == '__main__':
    # configure the application
    import config
    config = config.configure({})

    # logger
    logger = Logger(config["logsFilename"])
    # Store it in the config
    config["logger"] = logger
    # retrieve the [dao] layer
    dao = config["layers"]["dao"]

    # run the test methods
    print("Tests in progress...")
    unittest.main()
  • We create an |execution configuration| for this test;
  • We start the web server with its entire environment;
  • run the test;

The results are as follows:


C:\Data\st-2020\dev\python\cours-2020\python3-flask-2020\venv\Scripts\python.exe C:/Data/st-2020/dev/python/cours-2020/python3-flask-2020/impots/http-clients/02/tests/TestHttpClientDao.py
tests in progress...
...........
----------------------------------------------------------------------
Ran 11 tests in 6.128s

OK

Process finished with exit code 0