Integrating Quant Data, InfluxDB, and Python

After installing InfluxDB 1.8 on my MacBook, this follow up post shows how I integrated InfluxDB with Python for storing and retrieving quant data.

InfluxDB Basic Concepts

The InfluxDB 1.8 Getting Started documentation was helpful teaching me the basic concepts and commands necessary to use InfluxDB to create a database, store and query some data, then drop the database. Here's my log, starting from the shell prompt:

$ influx
Connected to http://localhost:8086 version 1.8.3
InfluxDB shell version: 1.8.3
> show databases
name: databases
name
----
_internal
> create database tester
> show databases
name: databases
name
----
_internal
tester
> use tester
Using database tester
> INSERT cpu,host=serverA,region=us_west value=0.64
> SELECT "host", "region", "value" FROM "cpu"
name: cpu
time                host    region  value
----                ----    ------  -----
1606583953911766000 serverA us_west 0.64
> drop database tester

To prepare for the next part of the journey, I then created a database and a user (hidden by ***).

> create database quant
> create user *** with password '***' WITH ALL PRIVILEGES

Using InfluxDB with Python

My intent is to do Python-based machine learning with quant data. Here's my log of how to get InfluxDB working with both Python and Pandas.

Installing the InfluxDB Python Client Library

To use InfluxDB with Python and Pandas, you need to install the InfluxDB Python client library. Google didn't find me a conda install option, so I used pip instead after activating my preferred conda environment:

> pip install influxdb-client
> pip install influxdb

Storing and Retrieving Quant Data in InfluxDB with Python

Here's a code snippet to fetch stock data from Yahoo! Finance into a Pandas dataframe and then store the basic data into the quant database, inspired by an InfluxDB community forum conversation and the InfluxDB Python Examples.

# Basic libraries
import numpy as np
import pandas as pd
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
import datetime as dt
import yfinance as yf
from influxdb_client import InfluxDBClient
from influxdb import DataFrameClient

# Variables
local_db = "quant"
symbol = "^GSPC".upper()
period, years = '1wk', 5

# start and end dates
today = (dt.date.today() + dt.timedelta(days=1)).strftime("%Y-%m-%d")
end_date = today
start_date = (pd.Timestamp.now() - pd.DateOffset(years=years)).strftime("%Y-%m-%d")

# get data for symbol
df = yf.download([symbol], start=start_date, end=end_date, interval=period, auto_adjust=False)

# Add some features
df['Market'] = 'SP500'
df['Symbol'] = symbol
df['Period'] = period
print(df.tail())


df.info()

<class 'pandas.core.frame.DataFrame'>
DatetimeIndex: 262 entries, 2015-11-30 to 2020-11-27
Data columns (total 8 columns):
 #   Column     Non-Null Count  Dtype  
---  ------     --------------  -----  
 0   Open       262 non-null    float64
 1   High       262 non-null    float64
 2   Low        262 non-null    float64
 3   Close      262 non-null    float64
 4   Adj Close  262 non-null    float64
 5   Volume     262 non-null    int64  
 6   Market     262 non-null    object 
 7   Symbol     262 non-null    object 
dtypes: float64(5), int64(1), object(2)
memory usage: 18.4+ KB

# Create an InfluxDB connection to the quant database (client)
client = DataFrameClient(host='localhost', port=8086, username='***', password='***', database='quant')

# Write the entire df to a table named 'market_data'
client.write_points(dataframe=df, measurement='market_data', tag_columns=['Market', 'Symbol'])

# Get the first 10 rows of the market_data table
client.query("select * from market_data limit 10")

defaultdict(list,
            {'market_data':                              Adj Close        Close         High          Low Market         Open Period Symbol       Volume
             2015-11-30 00:00:00+00:00  2091.689941  2091.689941  2104.270020  2042.349976  SP500  2090.949951    1wk  ^GSPC  20459190000
             2015-12-07 00:00:00+00:00  2012.369995  2012.369995  2090.419922  2008.800049  SP500  2090.419922    1wk  ^GSPC  20618850000
             2015-12-14 00:00:00+00:00  2005.550049  2005.550049  2076.719971  1993.260010  SP500  2013.369995    1wk  ^GSPC  24611890000
             2015-12-21 00:00:00+00:00  2060.989990  2060.989990  2067.360107  2005.930054  SP500  2010.270020    1wk  ^GSPC  12177090000
             2015-12-28 00:00:00+00:00  2043.939941  2043.939941  2081.560059  2043.619995  SP500  2057.770020    1wk  ^GSPC  10057270000
             2016-01-04 00:00:00+00:00  1922.030029  1922.030029  2038.199951  1918.459961  SP500  2038.199951    1wk  ^GSPC  22089690000
             2016-01-11 00:00:00+00:00  1880.329956  1880.329956  1950.329956  1857.829956  SP500  1926.119995    1wk  ^GSPC  25291150000
             2016-01-18 00:00:00+00:00  1906.900024  1906.900024  1908.849976  1812.290039  SP500  1888.660034    1wk  ^GSPC  21324990000
             2016-01-25 00:00:00+00:00  1940.239990  1940.239990  1940.239990  1872.699951  SP500  1906.280029    1wk  ^GSPC  23703940000
             2016-02-01 00:00:00+00:00  1880.050049  1880.050049  1947.199951  1872.229980  SP500  1936.939941    1wk  ^GSPC  24081930000})

Nice, things are looking good!

Schema-Less, Duplicate-Less InfluxDB Test

I've been having fun with quant trading for quite some time and know that I always come up with a new idea that requires changes to my database schema and table definitions. Therefore, I wanted to try a schema-less database this time around and not have to worry about traditional relational database challenges like countless ALTER TABLE statements, resolving duplicate rows, etc. Is InfluxDB going to make my life easier? Here's another Python snippet.

client.query("select count(*) from market_data")

defaultdict(list,
            {'market_data':                            count_Adj Close  count_Close  count_High  count_Low  count_Open  count_Volume
             1970-01-01 00:00:00+00:00              262          262         262        262         262           262})

# Add a simple moving average feature
df['SMA_10'] = df['Close'].rolling(window=10).mean()

# Rewrite the entire df to a table named 'market_data'
client.write_points(dataframe=df, measurement='market_data', tag_columns=['Market', 'Symbol', 'Period'])

defaultdict(list,
            {'market_data':                            count_Adj Close  count_Close  count_High  count_Low  count_Open  count_SMA_10  count_Volume
             1970-01-01 00:00:00+00:00              262          262         262        262         262           253           262})

Sweet -- Notice that there's 262 rows before and after the simple moving average feature addition! Thanks InfluxDB, now I can iterate on my quant data without nagging database concerns slowing me down.

Related Posts


Popular posts from this blog

My Experience Installing InfluxDB on Mac OS X Big Sur