Skip to content

Create the workload class

We now create the Python class file that describes our workload.

This is going to be a regular .py file which we need to create from scratch.

Fortunately, we can use built-in function dbworkload util gen_stub to generate a skeleton which we can use as a base to get started.

Create the stub file

Execute the following command

dbworkload util gen_stub -i bank.sql

A new file, bank.py, will be created in your directory.

Review the workload class file

The file is pretty long to be pasted here all in one shot, so we will go through it in sections.

Library imports

The first few lines define the imports

1
2
3
4
5
import datetime as dt
import psycopg
import random
import time
from uuid import uuid4

These are the most common libraries used when using a workload class. More can be added as needed.

Class init

Here is the definition of class Bank.

 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Bank:
    def __init__(self, args: dict):
        # args is a dict of string passed with the --args flag

        self.think_time: float = float(args.get("think_time", 5) / 1000)

        # you can arbitrarely add any variables you want
        self.my_var = 1

        # translation table for efficiently generating a string
        # -------------------------------------------------------
        # make translation table from 0..255 to A..Z, 0..9, a..z
        # the length must be 256
        self.tbl = bytes.maketrans(
            bytearray(range(256)),
            bytearray(
                [ord(b"a") + b % 26 for b in range(113)]
                + [ord(b"0") + b % 10 for b in range(30)]
                + [ord(b"A") + b % 26 for b in range(113)]
            ),
        )

The init adds some convenient examples on how to pass runtime arguments and set class variables. self.tbl is used to generate random string - we will review that in the next sections.

setup()

Next up is setup(). dbworkload invokes this function when it first starts up.

30
31
32
33
34
35
36
37
38
    # the setup() function is executed only once
    # when a new executing thread is started.
    # Also, the function is a vector to receive the excuting threads's unique id and the total thread count
    def setup(self, conn: psycopg.Connection, id: int, total_thread_count: int):
        with conn.cursor() as cur:
            print(
                f"My thread ID is {id}. The total count of threads is {total_thread_count}"
            )
            print(cur.execute(f"select version()").fetchone()[0])

loop()

Function loop() is what is repeatedly executed by dbworkload. In the return list you define the functions you want to execute.

40
41
42
43
44
45
46
47
48
49
50
    # the loop() function returns a list of functions
    # that dbworkload will execute, sequentially.
    # Once every func has been executed, loop() is re-evaluated.
    # This process continues until dbworkload exits.
    def loop(self):
        return [
            self.txn_0,
            self.txn_1,
            self.txn_2,
            self.txn_3,
        ]

Utility functions

Here the stub includes some commonly used functions, such as random_str to generate just that.

52
53
54
55
56
57
58
59
60
61
62
63
64
    #####################
    # Utility Functions #
    #####################
    def __think__(self, conn: psycopg.Connection):
        time.sleep(self.think_time)

    def random_str(self, size: int = 12):
        return (
            random.getrandbits(8 * size)
            .to_bytes(size, "big")
            .translate(self.tbl)
            .decode()
        )

Workload transactions

Next are the stub of the functions generated from the SQL statements in the bank.sql file.

This is the read operation

68
69
70
71
72
73
74
75
76
77
78
79
80
81
    def txn_0(self, conn: psycopg.Connection):
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT *
                FROM orders
                WHERE acc_no = %s
                  AND id = %s
                """,
                (
                    # add bind parameter,
                    # add bind parameter, 
                ), 
            ).fetchall()

And below are the transactions related to the order

 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
    def txn_1(self, conn: psycopg.Connection):
        with conn.cursor() as cur:
            cur.execute(
                """
                INSERT INTO orders (acc_no, status, amount)
                VALUES (%s, 'Pending', %s) RETURNING id
                """,
                (
                    # add bind parameter,
                    # add bind parameter, 
                ), 
            ).fetchall()

    def txn_2(self, conn: psycopg.Connection):
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT *
                FROM ref_data
                WHERE acc_no = %s
                """,
                (
                    # add bind parameter, 
                ), 
            ).fetchall()

    def txn_3(self, conn: psycopg.Connection):
        with conn.cursor() as cur:
            cur.execute(
                """
                UPDATE orders
                SET status = 'Complete'
                WHERE (acc_no,
                       id) = (%s,
                              %s)
                """,
                (
                    # add bind parameter,
                    # add bind parameter, 
                ), 
            )

Helpful tips

Finally, the stub finishes with some tips on how to generate random data

126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
'''
# Quick random generators reminder

# random string of 25 chars
self.random_str(25),

# random int between 0 and 100k
random.randint(0, 100000),

# random float with 2 decimals 
round(random.random()*1000000, 2)

# now()
dt.datetime.utcnow()

# random timestamptz between certain dates,
# expressed as unix ts
dt.datetime.fromtimestamp(random.randint(1655032268, 1759232268))

# random UUID
uuid4()

# random bytes
size = 12
random.getrandbits(8 * size).to_bytes(size, "big")

'''

In the next session, we customize the file to fit our workload.

Psycopg 3 basic usage

It might be a good idea now to refresh how the psycopg driver works. Fortunately, there is a great intro doc in the official website 🚀