Crypto Data Bot using Python, Binance WebSockets and PostgreSQL DB

Crypto Data Bot using Python, Binance WebSockets and PostgreSQL DB

·

11 min read

Its been almost a year since my last write-up on this blog. I have been busy adjusting to a new job and other projects. The idea and code for this project was mostly done. However, the article and bug fixes took longer. Lets jump-in and code a crypto price bot using Binance WebSockets and Postgres DB!

Disclaimer

This article does not contain any investment advice. I am not a financial advisor. I am not responsible for any losses incurred by following this article. Please do your own research before investing in any cryptocurrency. I am not affliaited with Binance or any other cryptocurrency exchange. 'Binance' is a registered trademark of Binance Holdings Limited.

Introduction

Crypo illustration

The rise of Web3 and explosion of Crptocurrency valuations and trading means there's a tonne of data out there being generated. This is presesents a lot of opportunities for data scientists and analysts. However, the data is not easily accessible. This is where the Binance WebSockets API comes in. It allows us to stream live data from Binance. We can then store this data in a database and use it for analysis.

Audience and Objectives

This article is aimed at developers/software engineers who are interested in learning how to integrate Binance WebSockets API and Postgres DB using Python.

The article will cover the following topics:

  • What are WebSockets?
  • What is Binance WebSockets API?
  • What is Postgres DB?
  • Using Python to connect to Binance WebSockets API.
  • Streaming data from Binance WebSockets API.
  • Storing data from Binance WebSockets API in Postgres DB.

Prerequisites to Getting Started

To effectively follow this article and code, you will need to have the following:

  • Python and pip installed on your machine. Any version above 3.7 should work.
  • Git installed on your machine. Check appropriate instructions for your system/environment. For Arch-Linux users:

    sudo pacman -S git
    
  • A Binance account. You can create one here.

  • Intergrated Development Environment (IDE) of your choice. I will be using VSCode. You can download it here.
  • Postgres DB installed on your machine. You can download it here. For Arch-Linux users:

    sudo pacman -S postgresql
    
  • Postgres DB client of your choice. I will be using pgAdmin4. You can download it here. For Arch-Linux users:

    sudo pacman -S pgadmin4
    

    Alternatively installed via docker. This article is an awesome resource on how to get Postgresql and Pgadmin4 installed as containers.

  • A Binance API key and secret. You can create one here. Make sure to save your API key and secret somewhere safe. You will need them later.

What are WebSockets?

WebSockets are used to establish a persistent connection between a client and a server. This connection allows the client and server to exchange data in real-time. This is useful for applications that require real-time data. For example, a chat application.

The client and server can exchange messages in real-time. This is also useful for applications that require data to be updated in real-time. For example, a stock market application. The client and server can exchange stock prices in real-time.

What is Binance WebSockets API?

Binance WebSockets API allows us to stream live data from Binance. This data includes:

  • Trades and Aggregate Trade History
  • Kline/Candlestick Information
  • Market Rolling Information
  • Individual Symbol Ticker Information among many others. Check full list here

What is Postgres DB?

Postgres DB is a relational database management system (RDBMS). It is an open-source database. It is a powerful tool for storing and querying data. It is also a great tool for data analysis.

Psycopg2 is a Python library that allows us to connect to Postgres DB. We will use Psycopg2 to connect to Postgres DB and store data from Binance WebSockets API.

Sqlalchemy is another Python library that allows us to connect to Postgres DB. We will use Sqlalchemy to create a database schema for our data.

Using Python to connect to Binance WebSockets API

Initial Setup

The following steps are verified to work on Arch-Linux. However, they should work on other Linux distributions and MacOS.

Note: For Windows users, your implementation may differ slightly.

  1. Create a new directory and for your project and change to it. I will be using 'py-crypto-bot' as my project directory. You can use any name you like.

    mkdir py-crypto-bot && cd py-crypto-bot
    
  2. Create a new virtual environment and activate it. I will be using 'venv' as my virtual environment name. You can use any name you like.

    python -m venv venv && source venv/bin/activate
    
  3. Install the following Python libraries:

    pip install python-dotenv websocket-client psycopg2-binary sqlalchemy python-binance
    
  4. Create a new file called '.env' and add the following:

    # .env file
    BINANCE_API_KEY=YOUR_API_KEY
    BINANCE_API_SECRET=YOUR_API_SECRET
    DB=postgresql+psycopg2://dbuser:dbpassword@localhost:5432/dbname
    

    Ensure you enter your correct api key and secret as well as the database credentials

  5. Create a new file called 'bot.py' and add the following:

  # bot.py
  import os
  import websocket as wb
  from pprint import pprint
  import json
  from binance.client import Client
  from binance.enums import *
  from dotenv import load_dotenv


  # load environment variables
  load_dotenv()

  # create client
  client = Client(os.getenv('BINANCE_API_KEY'), os.getenv('BINANCE_API_SECRET'))

The above code does the following:

  • Imports the required libraries.
  • Loads environment variables from '.env' file.

The Tables Are Coming

We will be using Sqlalchemy to create a database schema for our data. We will be using the following table:

  • crypto - This table will store trades data.

Inorder to keep our project well structured and organized, We separate the database schema and connection code into separate files. Create a new file called base_sql.py and add the following:

# base_sql.py
import os

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from pprint import pprint
from dotenv import load_dotenv

load_dotenv()

db = os.environ.get("DB")
# check if there is an database string in the .env file
if db:
    db = db
else:
    # if no variable is specified, use the default string below
    db = "postgresql+psycopg2://test:testpassword@localhost:5432/price_data"

engine = create_engine(db)
engine.connect()

pprint(f"connection successful! : {engine}")
# create a session variable. Allows all our transactions to be ran in the context of a session
Session = sessionmaker(bind=engine)

Base = declarative_base()

The code above does the following:

  • Imports the required libraries.
  • Loads the required environment variables from the .env file.
  • Checks if there is a database string in the .env file. If there is no database string, it uses the default string.
  • Creates a database engine.
  • Connects to the database engine.
  • Create a session and binds it to the database engine. This allows all our transactions to be ran in the context of a session.
  • Creates a base class for our database schema.
  • Outputs a Success message if the connection is successful.

Create a new file called price_data_sql.py and add the following code to it:

# price_data_sql.py
from sqlalchemy import Date, Column, Integer, String, Float

from base_sql import Base

class CryptoPrice(Base):
    __tablename__ = "crypto"

    id = Column(Integer, primary_key=True)
    crypto_name = Column(String(90))
    close_price = Column(Float())
    open_price = Column(Float())
    high_price = Column(Float())
    low_price = Column(Float())
    volume = Column(Float())
    time = Column(Date())

    def __int__(self, crypto_name, close_price, open_price,
                high_price, low_price, volume, time):
        self.crypto_name = crypto_name
        self.open_price = open_price
        self.close_price = close_price
        self.high_price = high_price
        self.low_price = low_price
        self.volume = volume
        self.time = time

This file holds the database schema for our data. Lets go through the code above:

  • We first import the required datatypes from sqlalchemy library.
  • We then import the base class from the base_sql.py file.
  • We then create a class called CryptoPrice and inherit from the base class. We also specify the table name as crypto. Sqlalchemy is an Object Relatioonal Mapper (ORM) and it uses classes to represent tables in the database.
  • Inside the CryptoPrice class we proceed to define the structure of our table. We define the columns and their datatypes. We also define the primary key for the table. In this case, we use the id column as the primary key.
  • We map different columns to different datatypes. For example, the time column is mapped to the Date datatype. The volume column is mapped to the Float datatype.
  • Finally, we define the __init__ method for the class. This method is called when we create an instance of the class. We use this method to initialize the class attributes.

Now we create a new file: database_insert.py that we'll run to create our tables and populate with the required structure. Add the following code to the file:

# database_insert.py
from base_sql import engine, Base, Session

def create_table():
    # create a new session
    session = Session()
    # attempts to create the base tables and populate the db in a session. in case of an issue rolls back the session
    try:
        # Generate schema
        Base.metadata.create_all(engine)
        session.commit()
        print(f"connected & db populated")
    except Exception as e:
        session.rollback()
        print(e)

    session.close()

This file is a script that we will run to create our tables in the context of a session. Lets go through the code above:

  • First we import engine, Base and Session from the base_sql.py file.
  • We then create a function called create_table that does the following:
    • We then create a new session.
    • We then execute the script in a try block. If there is an error, we rollback the session and print the error.
    • We close the session.

Streaming Data From Binance WebSockets API

Streaming data illustration

We now have a proper database structure defined that we can use to insert the data we get from the Binance WebSockets API. We will now create a function that will stream data from the Binance WebSockets API and insert the data into our database. Add the following code to the bot.py file:

# bot.py
from datetime import datetime
from database_insert import create_table
from base_sql import Session
from price_data_sql import CryptoPrice

# this functions creates the table if it does not exist
create_table()

# create a new session
session = Session()

BINANCE_SOCKET = "wss://stream.binance.com:9443/stream?streams=ethusdt@kline_1m/btcusdt@kline_1m"
closed_prices = []


API_KEY = os.environ.get("API_KEY")
API_SECRET = os.environ.get("API_SECRET")
client = Client(API_KEY, API_SECRET, tld='us')

def on_open(ws):
    print("connection opened")


def on_close(ws):
    print("closed connection")


def on_error(ws, error):
    print(error)


def on_message(ws, message):
    message = json.loads(message)
    pprint(message)
    candle = message['k']
    trade_symbol = message['s']
    is_candle_closed = candle['x']
    global closed_prices
    if is_candle_closed:
        symbol = candle['s']
        closed = candle['c']
        open = candle['o']
        high = candle['h']
        low = candle['l']
        volume = candle['v']
        pprint(f"closed: {closed}")
        pprint(f"open: {open}")
        pprint(f"high: {high}")
        pprint(f"low: {low}")
        pprint(f"volume: {volume}")
        closed_prices.append(float(closed))
        # create price entries
        crypto = CryptoPrice(crypto_name=symbol, open_price=open, close_price=closed,
                             high_price=high, low_price=low, volume=volume, time=datetime.utcnow())
        try:
            session.add(crypto)
            session.commit()
        except Exception as e:
            session.rollback()
            print(e)
        session.close()

ws = wb.WebSocketApp(BINANCE_SOCKET, on_open=on_open, on_close=on_close, on_error=on_error, on_message=on_message)
ws.run_forever()

Let's walkthrough the code above:

  • We first import the required libraries. Including the database_insert function from the database_insert.py file, the Session class from the base_sql.py file and the CryptoPrice class from the price_data_sql.py file.
  • We proceed to call the create_table function to create the table if it does not exist in our database.
  • Additionally we create a new session with the Session class. This will enable us to run our database operations in the context of a session.
  • From the Binance documentation we get the WebSockets API endpoint for the ETHUSDT and BTCUSDT pairs. We then store the endpoint in the BINANCE_SOCKET variable.
  • We create an empty list with a variable name closed_prices. This list will hold the closed prices of the candles.
  • We then import the API_KEY and API_SECRET from the environment variables. We then create a new client with the Client class.
  • Following proper conventions we create the on_open, on_close, on_error and on_message functions. These functions are called when the WebSockets connection is opened, closed, an error occurs or a message is received respectively.

    Lets walkthrough the on_message function:

    • We first parse the data received from the websocket from json to a python dictionary.
    • We then get the status of the candle i.e. "open" or "closed" from the x key in the dictionary. As well the specified trade symbol from the s key.
    • An if statement checks whether the candle is closed. If it is closed, we proceed to get the open, close, high, low and volume prices from the o, c, h, l and v keys respectively.
    • We add the corresponding closed prices the closed_prices list.
    • We assign the values to an instance of the CryptoPrice class. We then add the instance to the session and commit the changes to the database.
    • We then close the session.
    • We then define the ws variable with the WebSocketApp class. We pass the BINANCE_SOCKET variable as the first argument. We then pass the on_open, on_close, on_error and on_message functions as the on_open, on_close, on_error and on_message arguments respectively.
    • Finally we run the websocket app with the run_forever method.

Storing Crypto Data in a Postgres DB

Database Illustration

As mentioned in the above section we are adding crypto price data when we get a successful response closed from the websocket in relation to candlestick data. We are then storing the data in a Postgres database. Here is a snapshot of ETHUSDT data in the database:

ETHUSDT data in the database

Now we could easily query the database to get the data we need. For example, we could get the average price of ETHUSDT for the last 10 minutes by running the following query:

SELECT AVG(close_price) FROM crypto WHERE crypto_name = 'ETHUSDT' AND time > NOW() - INTERVAL '10 minutes';

Having the data in a Database opens up a plethora of opportunities from data analysis to machine learning. We could also use the data to create a trading bot that trades based on the data we get from the database.

Possible Improvements

  • Dockerize the script + db for easy implementation and running in production
  • Using Python to query data from Postgres DB.
  • Using Python to plot data from Postgres DB.

  • add more pairs to script.

  • Implement caching e.g. redis for quicker read/write ops.

  • Add a notification system e.g. SMS/Email when the price reaches a required threshold or candle closes.

Conclusion

This is just a simple outline for a crypto bot, there are a lot of improvements that can be made. Find the code for this project on GitHub link

I hope this article was helpful and you were able to learn something new. If you have any questions or suggestions please feel free to reach out to me on Twitter or LinkedIn.