Table of contents
- Disclaimer
- Introduction
- Audience and Objectives
- Prerequisites to Getting Started
- What are WebSockets?
- What is Binance WebSockets API?
- What is Postgres DB?
- Using Python to connect to Binance WebSockets API
- Streaming Data From Binance WebSockets API
- Storing Crypto Data in a Postgres DB
- Possible Improvements
- Conclusion
Its been almost a year since my last write-up on this blog. I have been busy adjusting to a new job and other projects. The idea and code for this project was mostly done. However, the article and bug fixes took longer. Lets jump-in and code a crypto price bot using Binance WebSockets and Postgres DB!
Disclaimer
This article does not contain any investment advice. I am not a financial advisor. I am not responsible for any losses incurred by following this article. Please do your own research before investing in any cryptocurrency. I am not affliaited with Binance or any other cryptocurrency exchange. 'Binance' is a registered trademark of Binance Holdings Limited.
Introduction
The rise of Web3 and explosion of Crptocurrency valuations and trading means there's a tonne of data out there being generated. This is presesents a lot of opportunities for data scientists and analysts. However, the data is not easily accessible. This is where the Binance WebSockets API comes in. It allows us to stream live data from Binance. We can then store this data in a database and use it for analysis.
Audience and Objectives
This article is aimed at developers/software engineers who are interested in learning how to integrate Binance WebSockets API and Postgres DB using Python.
The article will cover the following topics:
- What are WebSockets?
- What is Binance WebSockets API?
- What is Postgres DB?
- Using Python to connect to Binance WebSockets API.
- Streaming data from Binance WebSockets API.
- Storing data from Binance WebSockets API in Postgres DB.
Prerequisites to Getting Started
To effectively follow this article and code, you will need to have the following:
- Python and pip installed on your machine. Any version above 3.7 should work.
Git installed on your machine. Check appropriate instructions for your system/environment. For Arch-Linux users:
sudo pacman -S git
A Binance account. You can create one here.
- Intergrated Development Environment (IDE) of your choice. I will be using VSCode. You can download it here.
Postgres DB installed on your machine. You can download it here. For Arch-Linux users:
sudo pacman -S postgresql
Postgres DB client of your choice. I will be using pgAdmin4. You can download it here. For Arch-Linux users:
sudo pacman -S pgadmin4
Alternatively installed via docker. This article is an awesome resource on how to get Postgresql and Pgadmin4 installed as containers.
- A Binance API key and secret. You can create one here. Make sure to save your API key and secret somewhere safe. You will need them later.
What are WebSockets?
WebSockets are used to establish a persistent connection between a client and a server. This connection allows the client and server to exchange data in real-time. This is useful for applications that require real-time data. For example, a chat application.
The client and server can exchange messages in real-time. This is also useful for applications that require data to be updated in real-time. For example, a stock market application. The client and server can exchange stock prices in real-time.
What is Binance WebSockets API?
Binance WebSockets API allows us to stream live data from Binance. This data includes:
- Trades and Aggregate Trade History
- Kline/Candlestick Information
- Market Rolling Information
- Individual Symbol Ticker Information among many others. Check full list here
What is Postgres DB?
Postgres DB is a relational database management system (RDBMS). It is an open-source database. It is a powerful tool for storing and querying data. It is also a great tool for data analysis.
Psycopg2 is a Python library that allows us to connect to Postgres DB. We will use Psycopg2 to connect to Postgres DB and store data from Binance WebSockets API.
Sqlalchemy is another Python library that allows us to connect to Postgres DB. We will use Sqlalchemy to create a database schema for our data.
Using Python to connect to Binance WebSockets API
Initial Setup
The following steps are verified to work on Arch-Linux. However, they should work on other Linux distributions and MacOS.
Note: For Windows users, your implementation may differ slightly.
Create a new directory and for your project and change to it. I will be using 'py-crypto-bot' as my project directory. You can use any name you like.
mkdir py-crypto-bot && cd py-crypto-bot
Create a new virtual environment and activate it. I will be using 'venv' as my virtual environment name. You can use any name you like.
python -m venv venv && source venv/bin/activate
Install the following Python libraries:
pip install python-dotenv websocket-client psycopg2-binary sqlalchemy python-binance
Create a new file called '.env' and add the following:
# .env file BINANCE_API_KEY=YOUR_API_KEY BINANCE_API_SECRET=YOUR_API_SECRET DB=postgresql+psycopg2://dbuser:dbpassword@localhost:5432/dbname
Ensure you enter your correct api key and secret as well as the database credentials
Create a new file called 'bot.py' and add the following:
# bot.py
import os
import websocket as wb
from pprint import pprint
import json
from binance.client import Client
from binance.enums import *
from dotenv import load_dotenv
# load environment variables
load_dotenv()
# create client
client = Client(os.getenv('BINANCE_API_KEY'), os.getenv('BINANCE_API_SECRET'))
The above code does the following:
- Imports the required libraries.
- Loads environment variables from '.env' file.
The Tables Are Coming
We will be using Sqlalchemy to create a database schema for our data. We will be using the following table:
- crypto - This table will store trades data.
Inorder to keep our project well structured and organized, We separate the database schema and connection code into separate files. Create a new file called base_sql.py
and add the following:
# base_sql.py
import os
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from pprint import pprint
from dotenv import load_dotenv
load_dotenv()
db = os.environ.get("DB")
# check if there is an database string in the .env file
if db:
db = db
else:
# if no variable is specified, use the default string below
db = "postgresql+psycopg2://test:testpassword@localhost:5432/price_data"
engine = create_engine(db)
engine.connect()
pprint(f"connection successful! : {engine}")
# create a session variable. Allows all our transactions to be ran in the context of a session
Session = sessionmaker(bind=engine)
Base = declarative_base()
The code above does the following:
- Imports the required libraries.
- Loads the required environment variables from the
.env
file. - Checks if there is a database string in the
.env
file. If there is no database string, it uses the default string. - Creates a database engine.
- Connects to the database engine.
- Create a session and binds it to the database engine. This allows all our transactions to be ran in the context of a session.
- Creates a base class for our database schema.
- Outputs a Success message if the connection is successful.
Create a new file called price_data_sql.py
and add the following code to it:
# price_data_sql.py
from sqlalchemy import Date, Column, Integer, String, Float
from base_sql import Base
class CryptoPrice(Base):
__tablename__ = "crypto"
id = Column(Integer, primary_key=True)
crypto_name = Column(String(90))
close_price = Column(Float())
open_price = Column(Float())
high_price = Column(Float())
low_price = Column(Float())
volume = Column(Float())
time = Column(Date())
def __int__(self, crypto_name, close_price, open_price,
high_price, low_price, volume, time):
self.crypto_name = crypto_name
self.open_price = open_price
self.close_price = close_price
self.high_price = high_price
self.low_price = low_price
self.volume = volume
self.time = time
This file holds the database schema for our data. Lets go through the code above:
- We first import the required datatypes from sqlalchemy library.
- We then import the base class from the
base_sql.py
file. - We then create a class called
CryptoPrice
and inherit from the base class. We also specify the table name ascrypto
. Sqlalchemy is an Object Relatioonal Mapper (ORM) and it uses classes to represent tables in the database. - Inside the
CryptoPrice
class we proceed to define the structure of our table. We define the columns and their datatypes. We also define the primary key for the table. In this case, we use theid
column as the primary key. - We map different columns to different datatypes. For example, the
time
column is mapped to theDate
datatype. Thevolume
column is mapped to theFloat
datatype. - Finally, we define the
__init__
method for the class. This method is called when we create an instance of the class. We use this method to initialize the class attributes.
Now we create a new file: database_insert.py
that we'll run to create our tables and populate with the required structure. Add the following code to the file:
# database_insert.py
from base_sql import engine, Base, Session
def create_table():
# create a new session
session = Session()
# attempts to create the base tables and populate the db in a session. in case of an issue rolls back the session
try:
# Generate schema
Base.metadata.create_all(engine)
session.commit()
print(f"connected & db populated")
except Exception as e:
session.rollback()
print(e)
session.close()
This file is a script that we will run to create our tables in the context of a session. Lets go through the code above:
- First we import
engine
,Base
andSession
from thebase_sql.py
file. - We then create a function called
create_table
that does the following:- We then create a new session.
- We then execute the script in a try block. If there is an error, we rollback the session and print the error.
- We close the session.
Streaming Data From Binance WebSockets API
We now have a proper database structure defined that we can use to insert the data we get from the Binance WebSockets API. We will now create a function that will stream data from the Binance WebSockets API and insert the data into our database. Add the following code to the bot.py
file:
# bot.py
from datetime import datetime
from database_insert import create_table
from base_sql import Session
from price_data_sql import CryptoPrice
# this functions creates the table if it does not exist
create_table()
# create a new session
session = Session()
BINANCE_SOCKET = "wss://stream.binance.com:9443/stream?streams=ethusdt@kline_1m/btcusdt@kline_1m"
closed_prices = []
API_KEY = os.environ.get("API_KEY")
API_SECRET = os.environ.get("API_SECRET")
client = Client(API_KEY, API_SECRET, tld='us')
def on_open(ws):
print("connection opened")
def on_close(ws):
print("closed connection")
def on_error(ws, error):
print(error)
def on_message(ws, message):
message = json.loads(message)
pprint(message)
candle = message['k']
trade_symbol = message['s']
is_candle_closed = candle['x']
global closed_prices
if is_candle_closed:
symbol = candle['s']
closed = candle['c']
open = candle['o']
high = candle['h']
low = candle['l']
volume = candle['v']
pprint(f"closed: {closed}")
pprint(f"open: {open}")
pprint(f"high: {high}")
pprint(f"low: {low}")
pprint(f"volume: {volume}")
closed_prices.append(float(closed))
# create price entries
crypto = CryptoPrice(crypto_name=symbol, open_price=open, close_price=closed,
high_price=high, low_price=low, volume=volume, time=datetime.utcnow())
try:
session.add(crypto)
session.commit()
except Exception as e:
session.rollback()
print(e)
session.close()
ws = wb.WebSocketApp(BINANCE_SOCKET, on_open=on_open, on_close=on_close, on_error=on_error, on_message=on_message)
ws.run_forever()
Let's walkthrough the code above:
- We first import the required libraries. Including the
database_insert
function from thedatabase_insert.py
file, theSession
class from thebase_sql.py
file and theCryptoPrice
class from theprice_data_sql.py
file. - We proceed to call the
create_table
function to create the table if it does not exist in our database. - Additionally we create a new session with the
Session
class. This will enable us to run our database operations in the context of a session. - From the Binance documentation we get the WebSockets API endpoint for the ETHUSDT and BTCUSDT pairs. We then store the endpoint in the
BINANCE_SOCKET
variable. - We create an empty list with a variable name
closed_prices
. This list will hold the closed prices of the candles. - We then import the
API_KEY
andAPI_SECRET
from the environment variables. We then create a new client with theClient
class. Following proper conventions we create the
on_open
,on_close
,on_error
andon_message
functions. These functions are called when the WebSockets connection is opened, closed, an error occurs or a message is received respectively.Lets walkthrough the
on_message
function:- We first parse the data received from the websocket from json to a python dictionary.
- We then get the status of the candle i.e. "open" or "closed" from the
x
key in the dictionary. As well the specified trade symbol from thes
key. - An if statement checks whether the candle is closed. If it is closed, we proceed to get the
open
,close
,high
,low
andvolume
prices from theo
,c
,h
,l
andv
keys respectively. - We add the corresponding closed prices the
closed_prices
list. - We assign the values to an instance of the
CryptoPrice
class. We then add the instance to the session and commit the changes to the database. - We then close the session.
- We then define the
ws
variable with theWebSocketApp
class. We pass theBINANCE_SOCKET
variable as the first argument. We then pass theon_open
,on_close
,on_error
andon_message
functions as theon_open
,on_close
,on_error
andon_message
arguments respectively. - Finally we run the websocket app with the
run_forever
method.
Storing Crypto Data in a Postgres DB
As mentioned in the above section we are adding crypto price data when we get a successful response closed
from the websocket in relation to candlestick data. We are then storing the data in a Postgres database. Here is a snapshot of ETHUSDT data in the database:
Now we could easily query the database to get the data we need. For example, we could get the average price of ETHUSDT for the last 10 minutes by running the following query:
SELECT AVG(close_price) FROM crypto WHERE crypto_name = 'ETHUSDT' AND time > NOW() - INTERVAL '10 minutes';
Having the data in a Database opens up a plethora of opportunities from data analysis to machine learning. We could also use the data to create a trading bot that trades based on the data we get from the database.
Possible Improvements
- Dockerize the script + db for easy implementation and running in production
- Using Python to query data from Postgres DB.
Using Python to plot data from Postgres DB.
add more pairs to script.
Implement caching e.g. redis for quicker read/write ops.
Add a notification system e.g. SMS/Email when the price reaches a required threshold or candle closes.
Conclusion
This is just a simple outline for a crypto bot, there are a lot of improvements that can be made. Find the code for this project on GitHub link
I hope this article was helpful and you were able to learn something new. If you have any questions or suggestions please feel free to reach out to me on Twitter or LinkedIn.