Create the workload class
We now create the Python class file that describes our workload.
This is going to be a regular .py
file which we need to create from scratch.
Fortunately, we can use built-in function dbworkload util gen_stub
to generate a skeleton
which we can use as a base to get started.
Create the stub file
Execute the following command
dbworkload util gen_stub -i bank.sql
A new file, bank.py
, will be created in your directory.
Review the workload class file
The file is pretty long to be pasted here all in one shot,
so we will go through it in sections.
Library imports
The first few lines define the imports
| import datetime as dt
import psycopg
import random
import time
from uuid import uuid4
|
These are the most common libraries used when using a workload class.
More can be added as needed.
Class init
Here is the definition of class Bank
.
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 | class Bank:
def __init__(self, args: dict):
# args is a dict of string passed with the --args flag
self.think_time: float = float(args.get("think_time", 5) / 1000)
# you can arbitrarely add any variables you want
self.my_var = 1
# translation table for efficiently generating a string
# -------------------------------------------------------
# make translation table from 0..255 to A..Z, 0..9, a..z
# the length must be 256
self.tbl = bytes.maketrans(
bytearray(range(256)),
bytearray(
[ord(b"a") + b % 26 for b in range(113)]
+ [ord(b"0") + b % 10 for b in range(30)]
+ [ord(b"A") + b % 26 for b in range(113)]
),
)
|
The init
adds some convenient examples on how to pass runtime arguments and set class variables.
self.tbl
is used to generate random string - we will review that in the next sections.
setup()
Next up is setup()
.
dbworkload
invokes this function when it first starts up.
30
31
32
33
34
35
36
37
38 | # the setup() function is executed only once
# when a new executing thread is started.
# Also, the function is a vector to receive the excuting threads's unique id and the total thread count
def setup(self, conn: psycopg.Connection, id: int, total_thread_count: int):
with conn.cursor() as cur:
print(
f"My thread ID is {id}. The total count of threads is {total_thread_count}"
)
print(cur.execute(f"select version()").fetchone()[0])
|
loop()
Function loop()
is what is repeatedly executed by dbworkload
.
In the return list you define the functions you want to execute.
40
41
42
43
44
45
46
47
48
49
50 | # the loop() function returns a list of functions
# that dbworkload will execute, sequentially.
# Once every func has been executed, loop() is re-evaluated.
# This process continues until dbworkload exits.
def loop(self):
return [
self.txn_0,
self.txn_1,
self.txn_2,
self.txn_3,
]
|
Utility functions
Here the stub includes some commonly used functions, such as random_str
to generate just that.
52
53
54
55
56
57
58
59
60
61
62
63
64 | #####################
# Utility Functions #
#####################
def __think__(self, conn: psycopg.Connection):
time.sleep(self.think_time)
def random_str(self, size: int = 12):
return (
random.getrandbits(8 * size)
.to_bytes(size, "big")
.translate(self.tbl)
.decode()
)
|
Workload transactions
Next are the stub of the functions generated from the SQL statements in the bank.sql
file.
This is the read operation
68
69
70
71
72
73
74
75
76
77
78
79
80
81 | def txn_0(self, conn: psycopg.Connection):
with conn.cursor() as cur:
cur.execute(
"""
SELECT *
FROM orders
WHERE acc_no = %s
AND id = %s
""",
(
# add bind parameter,
# add bind parameter,
),
).fetchall()
|
And below are the transactions related to the order
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 | def txn_1(self, conn: psycopg.Connection):
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO orders (acc_no, status, amount)
VALUES (%s, 'Pending', %s) RETURNING id
""",
(
# add bind parameter,
# add bind parameter,
),
).fetchall()
def txn_2(self, conn: psycopg.Connection):
with conn.cursor() as cur:
cur.execute(
"""
SELECT *
FROM ref_data
WHERE acc_no = %s
""",
(
# add bind parameter,
),
).fetchall()
def txn_3(self, conn: psycopg.Connection):
with conn.cursor() as cur:
cur.execute(
"""
UPDATE orders
SET status = 'Complete'
WHERE (acc_no,
id) = (%s,
%s)
""",
(
# add bind parameter,
# add bind parameter,
),
)
|
Helpful tips
Finally, the stub finishes with some tips on how to generate random data
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152 | '''
# Quick random generators reminder
# random string of 25 chars
self.random_str(25),
# random int between 0 and 100k
random.randint(0, 100000),
# random float with 2 decimals
round(random.random()*1000000, 2)
# now()
dt.datetime.utcnow()
# random timestamptz between certain dates,
# expressed as unix ts
dt.datetime.fromtimestamp(random.randint(1655032268, 1759232268))
# random UUID
uuid4()
# random bytes
size = 12
random.getrandbits(8 * size).to_bytes(size, "big")
'''
|
In the next session, we customize the file to fit our workload.
Psycopg 3 basic usage
It might be a good idea now to refresh how the psycopg
driver works.
Fortunately, there is a great intro doc in the official website 🚀