One of the advantages of the Solana blockchain over other technologies is the speed with which new blocks can be produced. On the mainnet blockchain this happens roughly every 0.7 seconds, and on the devnet blockchain it is even faster, with a new block being produced every 0.4 seconds. The app that we are currently developing takes advantage of this by using the block production rate to set it's 'heartbeat'. Every time a new block is produced the state of the off-chain component of the app updates, and users can decide how the state will update by interacting with an on-chain program. Even if no-one uses the program within a given block however, the state of the off-chain program will still update (imagine any video game where even if you aren't pressing buttons, time is still passing in the game world).
We therefore need to be able to monitor every block that is validated, check whether any transactions within that block interacted with our on-chain program, and then save the result in a database. The off-chain app can then use this database to update its state in real time as each new block is produced.
One key requirement of our monitoring system is that when running live, and producing the database in realtime, it should yield exactly the same result as someone who simply downloads the data from the blockchain months or years later and builds their own database when all the blocks are available historically. Both the on and off-chain apps will be available for anyone to run themselves, and are being designed so that anyone using the same starting block should be able to retrieve the history of the chain from that point, and arrive at the same current state in order to verify that everything is working as advertised. We therefore have to ensure that the events in the live database will never be out of order due to the asynchronous nature of requesting information from the blockchain.
In this post we will describe the approach we have taken, the code for which is available in the python directory of our GitHub repo for this post here. There is also a rust implementation of a program that we are running on the Solana devnet, and an example client that will allow you to test the monitor code yourself, which we will describe at the bottom of this post.
The flow of the monitoring program is as follows:
Establish connections to our SQL database and Solana RPC endpoint
Determine the correct starting state
Request the set of finalized blocks from the last known entry up to the most recently validated block
Parse the data in those blocks and add new entries to the database
Repeat the last two steps indefinitely
We will now go through each of these tasks in detail.
Getting Connected
The main loop for our monitoring system is found in streamer.py, which starts by establishing a connection to the SQL database:
# in streamer.py
db_conn = create_database_connection()
# check the connection is valid
if db_conn isNone:
print("Error! cannot create the database connection.")
exit()
...
The create_database_connection function is shown in full below:
# in sql_funcs.py
# setup the connection to the database and create the table if required
defcreate_database_connection():
""" create a database connection to the SQLite database
Here we are making use of the python sqlite3 module to manage our database, which we have called solana_block_data.db, and use the connect function to open the connection. Note that we are setting the isolation_level to None here, which means that we will be explicitly controlling the start and end of the transactions that will be adding rows into our database, rather than have the python module handle this in the background for us. If for some reason this fails the function will return None and the main code in streamer.py will immediately exit with an error. If it succeeds it will call the create_table function shown below:
# in sql_funcs.py
# will create a table with the structure defined in the create table instruction if
# it does not already exist
defcreate_table(conn):
""" create a table from the create_table_sql statement
:param conn: Connection object
:return:
"""
create_table_idx =""" CREATE TABLE IF NOT EXISTS block_data (
id int PRIMARY_KEY,
block_slot int NOT NULL,
choice string NOT NULL,
bid_amount int NOT NULL); """
try:
c = conn.cursor()
c.execute(create_table_idx)
except Error as e:
print(e)
returnFalse
returnTrue
The inclusion of the IF_NOT_EXISTS statement in create_table_idx ensures that this instruction will only actually do anything if the table doesn't already exist in the database. In this case it will create a table with four columns, an id which is simply the row index and provides a unique identifier for each row, block_slot which is the slot number for a particular block, and finally choice and bid_amount which are the quantities that users can pass to our program that we want to keep track of, and will determine the evolution of our off-chain application. If for some reason this process fails it will return False and the main program will exit immediately with an error.
Assuming this connection has been established correctly, we then also connect to our QuickNode endpoint so that we can start making RPC requests.
# in streamer.py
# connect to solana endpoint
quick_node_dev ="MY_QUICK_NODE"
dev_client = Client(quick_node_dev)
if(not dev_client.is_connected()):
print("Error! cannot connect to quicknode endpoint.")
exit()
...
As we will show later, this monitoring process uses a lot of requests, so we don't recommend trying to use the public endpoints as you will find yourself kicked off very quickly!
Determining The Current State
The next step is to initialize the current state of the monitoring server, which will happen differently when we are starting fresh and the database is empty, compared to when we are restarting the monitoring server and the database already exists.
To find out which state we are in we use the function get_last_db_row:
# in streamer.py
...
current_row_id_to_insert =None
current_block =None
last_db_row = get_last_db_row(db_conn)
...
This function, shown below, creates an SQL query that will return the row that has the maximum value of id in the database. If there are no entries it will return None, and otherwise it will return the row. Note this is a very slow function to call, and this is the only time that we make use of it. Typically we will be tracking the current row id using the current_row_id_to_insert variable in streamer.py, and incrementing it as we iterate through the main loop.
# in sql_funcs.py
# returns the last row in the database, or None if it is empty
defget_last_db_row(conn):
# get the row that has the maximum value of id
# this returns a vector that has the shape [row, max_id]
# so we only return the first N_COLS=4 values
cur = conn.cursor()
cur.execute("SELECT *, max(id) FROM signatures")
r = cur.fetchone()
if(r[0]==None):
returnNone
return r[:N_COLS]
If there were already entries present then we can simply use the row id and block number from that row as the starting point for the main loop:
# in streamer.py
...
if(last_db_row !=None):
print("getting current_block from DB: ")
print(last_db_row)
current_row_id_to_insert = last_db_row[0]+1
current_block = last_db_row[1]
...
If the database doesn't currently exist, and get_last_db_row returned None, then we will set the current row id to zero, and use the get_slot function to retrieve the current slot number being worked on. Just a quick note here about blocks and slots; every block that is confirmed on the Solana blockchain has a corresponding slot, and in these cases the slot number and block number tend to be used interchangeably. Not all slots, however, have a block (we will come back to this later), and when using get_slot you can pass a commitment argument to specify how certain you want to be that the slot returned will actually have a block associated with it. By default this commitment level is set to finalized, which means the slot will definitely have a block.
# in streamer.py
...
else:
print("getting current_block from client")
current_row_id_to_insert =0
current_block = get_slot(dev_client)
print("Starting with row: ", current_row_id_to_insert," Current block: ", current_block)
...
The get_slot function is the first example in this post of a call to the Solana JSON RPC API. Two of the three requests follow this same format (the other being get_blocks which we will use in the next section). As the request to the API is asynchronous, there is no guarantee that it will return successfully, as it may time out or some other problem may occur while processing the request. We therefore set up a While loop that tries to get a response, and catches any errors in order to simply try again after a short wait (0.25 seconds in our example).
# returns the current slot
defget_slot(dev_client):
whileTrue:
try:
slot = dev_client.get_slot()
except:
print("get_slot transaction request timed out")
time.sleep(sleep_time)
continue
if(not check_json_result("get_slot", slot)):
time.sleep(sleep_time)
continue
break
return slot["result"]
The responses from these requests should include either a result node, or an error node indicating something has gone wrong with the request. Sometimes however, a problem may occur when sending or receiving the request which causes neither of these to be present. We therefore define a simple helper function check_json to return True if the result node is present, and otherwise return False, logging the error if it was present:
defcheck_json_result(id, json_result):
if("result"in json_result.keys()):
returnTrue
if("error"in json_result.keys()):
error = json_result["error"]
print(id," returned error: ", error)
returnFalse
At this point we now have everything we need to initialize the current state of our monitor, and can enter the main loop that will take care of actually requesting the block data needed for our database.
The Monitoring Loop
We now enter the main loop, which consists of three main steps:
request the set of finalized block numbers since the last iteration
request the data for those blocks
process that data and store it in our database
We will go through each of these in turn below.
Getting The Next Finalized Block Numbers
Unlike other blockchains like Ethereum, some blocks on the Solana blockchain can be 'skipped', meaning they will contain no data. When trying to get the set of blocks that we should request, we just want to be able to ignore these skipped blocks, and the RPC API provides the function get_blocks to do precisely that. We just pass as an argument the most recent valid block number that we know about, and it will return the list of finalized blocks from that number, up to the most recent.
As with get_slot, we wrap the get_blocks function in a While loop within our get_block_list function, shown below. We also use our check_json function again here to verify whether the response is as expected, and in this case we also check that the list of blocks is not empty.
# returns the list of finalized blocks after and including block_idx
Within the main loop of our monitor, we check if the last entry of the list returned by get_block_list is the same as current_block. If it is then we know that no new blocks have been finalized, and so we simply wait a short time and then check again. Once we have new blocks within the list we simply remove the first entry and proceed to the next step.
...
while(True):
# get all the blocks after and including current_block
# if the last block in the list was the current block, just wait and check again shortly
if(block_list[-1]== current_block):
time.sleep(0.05)
continue
# we are only interested in the blocks after current_block so remove that one from the list
block_list = block_list[1:]
...
Getting the Blocks
Unlike the previous two RPC requests, in this case we want to be able to make multiple requests to the get_block RPC function. As such we will create a batch request and use the python requests module to post that to our endpoint. We handle creating the batch requests in the make_blocks_batch_request function in rpc_funcs.py.
Here dev_client_url is the URL of our RPC endpoint and block_list is the list of slots that we recieved from get_slot previously. In order to track the success of each request within the batch we use have_block, which is vector of bools that has the same length as block_list, and finally blocks is a map that will contain the responses for each block within the batch.
The format for these requests can be found here, and requires us to create a json with the correct headers and formatting. When submitting a batch request we require a single header, and the individual requests are simply appended together into a vector, with each one getting it's own unique id within the batch.
...
headers = CaseInsensitiveDict()
headers["Content-Type"]="application/json"
request_vec =[]
for i inrange(len(block_list)):
if(have_block[i]):
continue
new_request = json.loads('{
"jsonrpc":"2.0",
"id":0,
"method":"getBlock",
"params":[0,
{
"encoding":"json",
"transactionDetails":"full",
"rewards": false,
"maxSupportedTransactionVersion":0
}
]
}')
new_request["id"]= i +1
new_request["params"][0]= block_list[i]
request_vec.append(new_request)
The header contains only a single entry for the node Content-Type, which must be set to application/json. We define a template request using new_request, which we just assign a default id and slot of zero. Most of the settings in the params node are straight forward, though the maxSupportedTransactionVersion setting is a relatively new addition and is required to support blocks that have transactions using the v0 message type, as opposed to only "legacy", which is still the default.
For each block in the list we use this template to create a new request to getBlock, set the slot appropriately, and increment the id number, before adding it to the request_vec vector.
With the batch request constructed we can then post it to our endpoint using the requests module. As with the standard RPC API requests, we still need to manage the times where the request times out, or some other error occurs. As before we therefore wrap the post method in a while loop that retries until a response has been received. If the request has returned successfully it will have a status_code of 200, so we check this and simply return out of the function if it is not the case (we will handle this in the next function), and otherwise convert the response into a json object to make it easier to parse.
...
for response in resp_json:
if("id"notin response.keys()):
continue
if("result"notin response.keys()):
continue
id= response["id"]
blocks[block_list[id-1]]= response["result"]
have_block[id-1]=True
return have_block, blocks
This response object will be a vector of length the number of blocks requested, where each entry is the response for the specific block. Any one of these could in principle have failed for some reason, and so we iterate through the list, checking if there is a result node. If there is we can insert this into our blocks map using the slot number as the key, and mark the entry in the have_block vector as True so that we know this block has been received.
This function now returns the map, and the vector which denotes which of the blocks has been successfully requested. We handle the case where only a subset of the desired blocks have been received in the get_one_block_batch function which we show below.
This function is simply responsible for repeatedly calling make_blocks_batch_request until the whole of the have_block vector of bools has been set to True. For each iteration in the loop it will pass the current state both of this vector, and the blocks map back to make_blocks_batch_request, which will only send requests for the blocks that are still missing. Once it has finished requesting all the blocks in the batch it then returns the map.
Although in principle these functions could deal with batches of arbitrary size, once they reach a few thousand blocks the endpoint can start to become unresponsive as too many are submitted in a single request. Although in typical use we will only need to request small numbers of blocks at a time, if for some reason the monitoring server goes down for a period of hours, there can be many thousands of blocks that will need to be requested.
We therefore have one final layer where we take the initial block_list that is returned in the main loop, and break it up into chunks of one hundred blocks, and then process each of these chunks separately. This is done in the get_blocks function, which we will now describe below.
#in rpc_funcs.py
# Returns identity and transaction information about a confirmed block in the ledger
defget_blocks(dev_client_url, block_list):
n_blocks =len(block_list)
batch_size =100
# only submit max 100 requests in one go. At some point this will start to timeout if too many are sent
The first thing we need to do in this function is determine the number of batches we will have to process given our batch_size of one hundred. If this is only one, as will typically be the case, we can just call get_one_block_batch directly and no more needs to be done.
...
else:
print("requesting ", n_batches," with total ", n_blocks," blocks")
If we have more than one batch then we create a vector that will contain the slot numbers that each batch will be responsible for. For each entry in batch_lists we can then call get_one_block_batch, and then build up a single blocks map from the results of each batch. These requests are managed by pythons ThreadPoolExecutor, which can submit multiple batches in parallel and then process the results as they complete. At least with the end point we are using we have found it to be more reliable and faster to maintain a running pool of ten parallel requests, each for 100 blocks, than to submit batches of one thousand blocks in serial, however this may well depend on the end point so if you are implementing something like this yourself, you may just want to test out a range of batch and pool sizes.
...
max_threads =10
with cf.ThreadPoolExecutor(max_threads)as executor:
Returning to the main loop, we simply call the above function after retrieving our list of valid slots:
# in streamer.py
...
# request all the blocks in block_list from the endpoint
blocks = get_blocks(quick_node_dev, block_list)
...
Get Block Data
Each block that we requested has a transactions node, which contains a list of all the transactions that were included in that block. The structure of these transactions can be seen here. The final stage in the monitoring loop is to process our newly downloaded blocks, and record to the database either that no interactions with our on-chain program happened within these transactions, or what those interactions were.
We do this for a single block in our get_data_from_block function, which we will go through below.
# get the block and process it
defget_data_from_block(block_idx, block):
data_vec =[]
program ="H73oSXtdJfuBz8JWwdqyG92D3txMqxPEhAhT23T8eHf5"
Everything that we are interested in is found in the message node that exists within each transaction. This contains an instructions node which lists the instructions executed in that transaction, and an accountKeys node which lists the public keys used during the transaction, including the programs that were called. Within each instruction there is a programIdIndex, which gives the index into accountKeys that provides the program public key that executed the instruction.
In order to find which instructions are relevant to our program then, we simply iterate through all the instructions within each transaction and compare the program public key given by programIdIndex with the public key of our program.
...
if("data"notin instruction.keys()):
continue
data = instruction["data"]
decoded_data = base58.b58decode(data)
...
Once we have found an instruction of interest, we can access the data from the instruction's data node, which is encoded as a base58 string. We can then use the base58 python module to decode this into a byte array, and then use the borsh_construct module to convert that into a human readable data structure, however this final step requires knowledge of the data structures that the program will be passed.
In the program source code for this example we can see the definition of the instructions that our on-chain program will accept. In this case there is only one, and it takes a ChoiceData structure as an argument.
//in instruction.rs
pub enum ChoiceInstruction {
MakeChoice {
choice_data: ChoiceData
}
}
The ChoiceData structure simply contains a Choice enum, and an unsigned 64bit integer representing a quantity.
// in state.rs
pub enum Choice {
A,
B,
C,
D
}
pub struct ChoiceData {
pub choice : Choice,
pub bid_amount : u64
}
We can represent these same structures in python using borsh_construct:
# the enum listing available choices
choice_type = Enum(
"A",
"B",
"C",
"D",
enum_name ="Choice"
)
# the structure that MakeChoice expects containing a choice and a bid amount
ChoiceData = CStruct(
"choice"/ choice_type,
"bid_amount"/ U64
)
# enum of instructions the program can be sent
message = Enum(
"MakeChoice"/ CStruct("choice_data"/ ChoiceData),
enum_name="ChoiceInstruction"
)
These objects allow us to try and convert a byte array into the given structure type using the parse function:
...
try:
args = message.parse(decoded_data)
except:
print("unable to parse data", decoded_data)
continue
if(notisinstance(args, message.enum.MakeChoice)):
print("Have data but not a MakeChoice:", args)
continue
data_vec.append(args)
return block_idx, data_vec
If this is successful we can then also check that the instruction is of the type we are interested in using the isinstance function. In this example this is trivially true because the program only accepts a single instruction, however in the more general case you may want to distinguish between different program instructions, and only save data for a subset. If we are interested in the data then we append it to the data_vec, which we then return when we are done parsing all the instructions in the block.
At this point we are almost done! Given a vector of instruction data we convert that into the correct format for our database with the create_rows_from_data function:
# in rpc_funcs.py
# create the rows for the database from the block data
Here we denote an empty block as a row with no_choice and amount 0, and otherwise access the choice and bid_amount fields from our ChoiceData structure to create each row, incrementing the row_id as we go. These rows can then be added to our database with the insert_rows function in sql_funcs.py:
# inset a set of rows into the table within a single transaction
definsert_rows(conn, rows):
"""
Create a new entry in the block_data table
:param conn:
:param row:
:return: project id
"""
sql =''' INSERT INTO block_data(id,block_slot,choice,bid_amount)
VALUES(?,?,?,?) '''
cur = conn.cursor()
cur.execute("begin")
for row in rows:
cur.execute(sql, row)
cur.execute("commit")
Here we manually initiate a new database transaction, insert all the rows within the block, and then commit the transaction. This ensures that all the data for a single block gets committed atomically, and we don't have a situation where the off-chain program looks up the data from a block and happens to check the database in a state where only part of the data for a block has been committed.
As with requesting the block data, if we are just processing a single block within a particular iteration of the main loop then these functions are just called directly. Otherwise if there are multiple blocks to be processed we once again use the concurrent.futures python module to multithread the processing and create all the new rows in parallel. Once all the blocks within an iteration have been processed the complete set of new rows are then added to the database in one go in the correct order, as shown below:
...
rows_to_insert =[]
# if there is only one block in the list we don't need to do any multithreading, just get the transactions and process them
if(len(block_list)==1):
b_idx, data = get_data_from_block(block_list[0], blocks[block_list[0]])
# update current_block to the last one in our list
current_block = block_list[-1]
The final step in the loop is to simply update current_block with the last block from block_list, and then the loop repeats again.
Testing The Monitoring Server
The monitoring server can be started simply by running python streamer.py in the python directory of this post's source code. This will start looking for interactions with a very simple program we are running on the Solana devnet, the code for which is located in the program directory, and for which there is a rust client in the client directory.
When this starts running you should see output that looks something like:
getting current_block from client
Starting with row: 0 Current block: 150660150
requesting from block 150660150
requesting 1 blocks: [150660151]
[ True]
adding row: (0, 150660151, 'no_choice', 0)
While it is running you can then use the client to interact with the program, passing it the location of a paper wallet, the choice you want to make passed as an integer from zero to three, and an amount passed as a final integer, for example:
cargo run YOUR_PAPER_WALLET 1 1
Once this transaction is processed and ends up in a block you will see it appear in the streamer output as follows:
adding row: (90, 150660257, 'Choice.B()', 1)
At any point you should be able to stop the monitoring server, and then restart it and it should just pick up where you left off, downloading any new blocks that have been finalized since you stopped monitoring. As we mentioned at the start, monitoring the Solana blockchain in this way takes quite a lot of requests, you can see our call history over the last few weeks as we have been running this server to test it before our app launch:
On average we are making about three hundred thousand requests a day, or about nine million a month! This is actually slightly worse on devnet than it would be on mainnet as the rate at which blocks are produced is about double on devnet, and so requires correspondingly more requests be made.
Thats it! We hope that you have found this post informative, and if so feel free to follow us on Twitter to keep up to date with future posts!