Apache Spark and stock price causality

The Challenge

I wanted to compute Granger causality (described below) for each pair of stocks listed in the New York Stock Exchange. Moreover, I wanted to analyze between one and thirty lags for each pair’s comparison. Needless to say, this requires massive computing power. I used Amazon EC2 as the computing platform, but needed a smooth way to architect the computation and parallelize it. Therefore I turned to Apache Spark (also described below). Code implementing the computation is included at the bottom of this text.

Granger Causality

Granger causality provides a statistical measure of how much information in one time series predicts the information in another. It works by examining the correlation between the two series where the expected predictor series is lagged by a specified number of time points. The result is a test statistic indicating the degree of Granger causality. Note that this is not a measure of true causality, as both time series might be actually caused by a third process.

Consider for example the daily change in closing prices of stocks BQH and WMB. Using the Python StatsModels package’s Granger test procedure (code below), I computed a p-value of 2.205e-26 for the null hypothesis that the daily change in BQH’s closing price does not Granger cause the daily change in WMB’s closing price at five lags. I concluded therefore that change in BQH does Granger cause change in WMB by five trading days. I then plotted the two normalized changes in price (lagged for BQH by five days) for 20 trading days. In the plot below one can see that the WMB price and the five-day lagged BQH price moves in the same direction for 14 of the 20 days, indicating that change in BQH closing price might be a useful predictor of future change in WMB closing price.

BQH_WMB_comparison

Apache Spark

Apache Spark is the new “it thing” in data science these days, so I thought it a good idea to learn it. Essentially it generalizes MapReduce, running much faster than Hadoop, and it provides Java, Scala, and Python interfaces. It is not limited to the two-step MapReduce procedure, easily allowing multiple map and reduce operations to be chained together, along with other useful data processing steps such as set unions, sorting, and generation of Cartesian products. The program largely takes care of parallelization during these steps, though offers some opportunity for fine tuning.

Apache Spark can be run on a local machine using multiple cores or on a cluster, but I’ve had more luck with parallelization using a single-worker cluster than using a local machine. The learning curve was comparable to that for Hadoop; writing applications that use the paradigm is quite easy once you “get it”. Map and reduce operations are specified as functions in one of the three languages listed above, and are parallelized by the various map and reduce commands provided by Spark. Variables can be shared across the parallel processes to aid procedures requiring storage of global values.

The Source Data

I started with daily closing stock prices for each trading day between January 1, 2000 and October 30, 2014, which I pulled from Yahoo Finance using Python’s Pandas package. I placed this data in one CSV file with the following format:

source_data_POST_CROP

The Code

Here is the Python code, with explanation, necessary to run the pairwise Granger causality computation in Apache Spark.

First, we import the necessary libraries:

from pyspark import SparkContext, SparkConf
import datetime
import pandas as pd
from statsmodels.tsa import stattools
import numpy as np

We next declare a SparkContext object using with configuration settings. The “setMaster” argument is specific to the cluster you create. The ports listed in the port arguments must be open on the cluster instances, a “gotcha” on EC2-based clusters especially:

conf = (SparkConf().setMaster("spark://ip-123-123-123-123:7077").setAppName("spark test").set('spark.executor.memory', '3g').set('spark.driver.port', 53442).set('spark.blockManager.port', 54321).set('spark.executor.port', 12345).set('spark.broadcast.port', 22222).set('spark.fileserver.port', 33333))
sc = SparkContext(conf = conf)

We then read the source data into a Spark data object:

combined_file = sc.textFile('/home/ec2-user/data.csv')

Next, we extract from each line the stock symbol as a string, the date as a datetime object, and the closing price as a floating point number

close = combined_file.map(lambda line: ((line.split(',')[0], datetime.datetime(int(line.split(',')[1].split('-')[0]), int(line.split(',')[1].split('-')[1]), int(line.split(',')[1].split('-')[2]))), float(line.split(',')[5])))

We reorganize the last dataset as a key value set with the symbol as the key. We could have combined this with the last step, but the process is fast and I didn’t want to break working code.

close_by_symbol = close.map(lambda a: (a[0][0], (a[0][1], a[1])))

We now group each entry by stock symbol. This produces a key-value set were each key is a stock symbol and each value is an iterator containing a list of (date, price) tuples.

close_grouped_by_symbol = close_by_symbol.groupByKey()

We define a function to compute the one-day difference in closing prices for each symbol. This is because we are conducting the Granger causality analysis on the daily price differences rather than the prices themselves.

def make_diff(a):
    datelist = []
    pricelist = []
    for i in a[1]:
        datelist.append(i[0])
        pricelist.append(i[1])
    sorted_indices = [x[0] for x in sorted(enumerate(datelist), key=lambda q:q[1])]
    datelist_sorted = []
    pricelist_sorted = []
    for i in sorted_indices:
        datelist_sorted.append(datelist[i])
        pricelist_sorted.append(pricelist[i])
    price_diff = []
    diff_date = []
    for i in range(1, len(pricelist_sorted)):
        price_diff.append(pricelist_sorted[i] - pricelist_sorted[i-1])
        diff_date.append(datelist_sorted[i])
    return a[0], zip(diff_date, price_diff)

Next, we execute a map operation using the function defined above to produce a dataset containing the one-day difference values:

close_diff_by_symbol = close_grouped_by_symbol.map(make_diff)

We have key-value pairs where the keys are stock symbols and the values are lists of (date, price difference) tuples. We now want the Cartesian product of this data set against itself, so that every stock symbol’s price difference data (with dates) is matched to every other stock symbol’s price difference data. Apache Spark really shines at this task, providing the “cartesian” function for the purpose:

cartesian_matched = close_diff_by_symbol.cartesian(close_diff_by_symbol)

We define a function that will match the price difference values for each symbol pair by date. This returns a tuple (symbol 1, symbol 2, price difference list 1, price difference list 2), where the lists are aligned by trading date:

def match(a):
    case_1 = a[0]
    case_2 = a[1]
    symbol1 = case_1[0]
    symbol2 = case_2[0]
    ts1 = case_1[1]
    ts2 = case_2[1]
    time1 = []
    price1 = []
    for i, j in ts1:
        time1.append(i)
        price1.append(j)
    time2 = []
    price2 = []
    for i, j in ts2:
        time2.append(i)
        price2.append(j)
    series_1 = pd.Series(price1, index=time1)
    series_2 = pd.Series(price2, index=time2)
    df_1 = pd.DataFrame(series_1, columns=['1'])
    df_2 = pd.DataFrame(series_2, columns=['2'])
    merged = df_1.join(df_2, how='inner')
    return (symbol1, symbol2, merged['1'].tolist(), merged['2'].tolist())

We then execute this function in a map operation:

matched = cartesian_matched.map(match)

We can now define our function to compute Granger causality from the date-matched pairs of price differences for each stock symbol pair. Lag values are used as keys in the dictionary:

def gc(a):
    m = 30
    s1 = a[0]
    s2 = a[1]
    list1 = a[2]
    list2 = a[3]
    test_array = np.array([list1, list2], np.int64)
    p_ssr_ftest_dict = {}
    try:
        gc = stattools.grangercausalitytests(test_array.T, m, verbose=False)
        for q in gc.keys():
            p_ssr_ftest_dict[q] = gc[q][0]['ssr_ftest'][1]
    except:
        for q in range(1, m+1):
            p_ssr_ftest_dict[q] = None
    return (s1, s2, p_ssr_ftest_dict)

We execute this function in a map operation:

symbol_pair_gc = matched.map(gc)

We define a function for converting the results to CSV format:

def report(a):
    s1 = a[0]
    s2 = a[1]
    fdict = a[2]
    report_list = []
    for q in sorted(fdict.keys()):
        report_list.append(str(fdict[q]))
    report_string = s1 + ',' + s2 + ',' + ','.join(report_list)
    return report_string

We execute a map operation using this function to create CSV output:

report_csv = symbol_pair_gc.map(report)

Finally, we save the results to a file. Note that when running on a cluster, for the path given below, a path “/home/ec2-user/output/gc_output/_temporary” must exist on the worker nodes before starting the program. “/home/ec2-user/output/gc_output” must not exist on the head node at start time.

report_csv.saveAsTextFile('/home/ec2-user/output/gc_output')

The Output

The program generates Granger causality test p-values for each stock symbol pair for lags one through 30, e.g.,

output_data_POST_CROP

Conclusion

There you have it; a way to compute pairwise Granger causality for each stock pair in the New York Stock Exchange using Apache Spark. I’ll let you know if an investment strategy emerges from this work.

One thought on “Apache Spark and stock price causality

Leave a Reply

Your email address will not be published.