Edit the workload
Ok, we have only a few changes to make to our stub file so that it matches our intended workload.
Read Transaction
The SELECT statement requires us to pass acc_no
and id
.
acc_no
is a number between 0 and 999.
This matches the data we generated for table ref_data
.
id
is a UUID and it is supposed to be generated by the database upon insertion.
So how can we pass a valid (acc_no, id)
to our query?
We can keep track of, say, 10,000 values (tuples) into a list, and pick from it randomly.
The list will be populated as we insert new orders into the database, so after a few 1000's iterations,
our list will be full and the SELECT will pass existing values.
A convenient Python object that can help is the deque, which allows us to define a max length.
We also need a way to control how many read transactions we want to do compared to order transactions. For this, we pass an argument at runtime.
Add required library
import datetime as dt
import psycopg
import random
import time
from uuid import uuid4
from collections import deque
Add variables and runtime argument
We add here the runtime argument read_pct
and 2 variables:
- the deque object called
order_tuples
- the
account_number
andid
needed for the Order transaction.
class Bank:
def __init__(self, args: dict):
# args is a dict of string passed with the --args flag
self.think_time: float = float(args.get("think_time", 5) / 1000)
# Runtime argument to control read vs order txns
# Percentage of read operations compared to order operations
self.read_pct: float = float(args.get("read_pct", 50) / 100)
# initiate deque with a random tuple so a read won't fail
self.order_tuples = deque([(0, uuid4())], maxlen=10000)
# keep track of the current account number and id
self.account_number = 0
self.id = uuid4()
# translation table for efficiently generating a string
# -------------------------------------------------------
# make translation table from 0..255 to A..Z, 0..9, a..z
# the length must be 256
self.tbl = bytes.maketrans(
bytearray(range(256)),
bytearray(
[ord(b"a") + b % 26 for b in range(113)]
+ [ord(b"0") + b % 10 for b in range(30)]
+ [ord(b"A") + b % 26 for b in range(113)]
),
)
Add bind parameters to read function
We rename function txn_0
to something more descriptive, and add the expected bind parameter.
def txn_read(self, conn: psycopg.Connection):
with conn.cursor() as cur:
cur.execute(
"""
SELECT *
FROM orders
WHERE acc_no = %s
AND id = %s
""",
random.choice(self.order_tuples),
).fetchall()
That's it! Our read operation is complete 🚀.
Order transaction
The order transaction is made up of 2 distinct database transactions:
- new_order
- order_exec
New Order
This is the first of the 2 transactions of our Order workload.
Rename function txn_1
to txn_new_order
, then add the required bind parameters.
We then save the returning id
along with the acc_no
to our tuple deque.
def txn_new_order(self, conn: psycopg.Connection):
# generate a random account number to be used for
# for the order transaction
self.account_number = random.randint(0, 999)
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO orders (acc_no, status, amount)
VALUES (%s, 'Pending', %s) RETURNING id
""",
(
self.account_number,
round(random.random() * 1000000, 2),
),
)
# save the id that the server generated
self.id = cur.fetchone()[0]
# save the (acc_no, id) tuple to our deque list
# for future read transactions
self.order_tuples.append((self.account_number, self.id))
Order Execution
The order execution transaction is an explicit transaction.
Rename function txn_2
accordingly, then add bind parameters as usual.
def txn_order_exec(self, conn: psycopg.Connection):
# this is how you start an explicit transaction with psycopg
with conn.transaction() as tx:
with conn.cursor() as cur:
cur.execute(
"""
SELECT *
FROM ref_data
WHERE acc_no = %s
""",
(
self.account_number,
),
).fetchall()
# simulate microservice doing something...
time.sleep(0.02)
cur.execute(
"""
UPDATE orders
SET status = 'Complete'
WHERE
(acc_no, id) = (%s, %s)
""",
(
self.account_number,
self.id,
),
)
Update loop()
The final change is to update the loop()
function to return either a read txn or an order txn.
def loop(self):
if random.random() < self.read_pct:
return [self.txn_read]
return [self.txn_new_order, self.txn_order_exec]
Full class file bank.py
For completeness, here is the full edited file, minus the helpful tips
import datetime as dt
import psycopg
import random
import time
from uuid import uuid4
from collections import deque
class Bank:
def __init__(self, args: dict):
# args is a dict of string passed with the --args flag
self.think_time: float = float(args.get("think_time", 5) / 1000)
# Percentage of read operations compared to order operations
self.read_pct: float = float(args.get("read_pct", 0) / 100)
# initiate deque with 1 random UUID so a read won't fail
self.order_tuples = deque([(0, uuid4())], maxlen=10000)
# keep track of the current account number and id
self.account_number = 0
self.id = uuid4()
# translation table for efficiently generating a string
# -------------------------------------------------------
# make translation table from 0..255 to A..Z, 0..9, a..z
# the length must be 256
self.tbl = bytes.maketrans(
bytearray(range(256)),
bytearray(
[ord(b"a") + b % 26 for b in range(113)]
+ [ord(b"0") + b % 10 for b in range(30)]
+ [ord(b"A") + b % 26 for b in range(113)]
),
)
# the setup() function is executed only once
# when a new executing thread is started.
# Also, the function is a vector to receive the excuting threads's unique id and the total thread count
def setup(self, conn: psycopg.Connection, id: int, total_thread_count: int):
with conn.cursor() as cur:
print(
f"My thread ID is {id}. The total count of threads is {total_thread_count}"
)
print(cur.execute(f"select version()").fetchone()[0])
# the loop() function returns a list of functions
# that dbworkload will execute, sequentially.
# Once every func has been executed, loop() is re-evaluated.
# This process continues until dbworkload exits.
def loop(self):
if random.random() < self.read_pct:
return [self.txn_read]
return [self.txn_new_order, self.txn_order_exec]
#####################
# Utility Functions #
#####################
def __think__(self, conn: psycopg.Connection):
time.sleep(self.think_time)
def random_str(self, size: int = 12):
return (
random.getrandbits(8 * size)
.to_bytes(size, "big")
.translate(self.tbl)
.decode()
)
# Workload function stubs
def txn_read(self, conn: psycopg.Connection):
with conn.cursor() as cur:
cur.execute(
"""
SELECT *
FROM orders
WHERE acc_no = %s
AND id = %s
""",
random.choice(self.order_tuples),
).fetchall()
def txn_new_order(self, conn: psycopg.Connection):
# generate a random account number to be used for
# for the order transaction
self.account_number = random.randint(0, 999)
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO orders (acc_no, status, amount)
VALUES (%s, 'Pending', %s) RETURNING id
""",
(
self.account_number,
round(random.random() * 1000000, 2),
),
)
# save the id that the server generated
self.id = cur.fetchone()[0]
# save the (acc_no, id) tuple to our deque list
# for future read transactions
self.order_tuples.append((self.account_number, self.id))
def txn_order_exec(self, conn: psycopg.Connection):
# with Psycopg, this is how you start an explicit transaction
with conn.transaction() as tx:
with conn.cursor() as cur:
cur.execute(
"""
SELECT *
FROM ref_data
WHERE acc_no = %s
""",
(
self.account_number,
),
).fetchall()
# simulate microservice doing something...
time.sleep(0.02)
cur.execute(
"""
UPDATE orders
SET status = 'Complete'
WHERE
(acc_no, id) = (%s, %s)
""",
(
self.account_number,
self.id,
),
)
We are now finally ready to run our workload!