simulating RNA-seq read counts

The Challenge

I want to explore the statistics of RNA sequencing (RNA-seq) on next-generation sequencing (NGS) platforms in greater detail, so I thought I’d start by simulating read counts to experiment with. This post details how I constructed a simulated set of read counts, examines its concordance with the expected negative binomial distribution of the counts across replicates, and then tests DESeq2’s analysis of the simulated read counts to determine how well it matches the expected outcome. Code implementing the calculations follows the description.

Constructing Simulated Read Counts

I first randomly sampled 1000 transcripts from RefSeq to obtain a list of representative transcript lengths, ensuring that no gene is represented twice in the sample (I made sure that no gene has two or more isoforms in the set). The resulting list and distribution of lengths looks like:

transcript_counts

length_distribution

I then simulated a relative abundance metric for the 1000 transcripts using a gamma distribution. This relative abundance metric is completely unrelated to the true expression levels of the genes I drew length information from, it is simply a randomly selected value assigned to each transcript length. (This is why I label the transcripts “0” to “999” instead of by their original RefSeq accession numbers, since at this point in the creation of simulated reads we are departing from prior biological information). The distribution of the relative abundance metric is shown below. The values on the X axis are only meaningful in relation to each other–my use of “relative” here is with regard to each gene’s abundance compared with each other, not “relative” across biological samples or technical replicates.

gamma_distribution

At this point I copied the data into two parts: length/abundance information for condition “A” and length/abundance information for condition “B”. Condition “B” is an exact copy of condition “A” except for approximately 10% of its transcripts in which the abundance metric has been cut in half, and another approximately 10% of its transcripts in which the abundance metric has been doubled. These conditions represent two simulated biological samples we are comparing. For each condition I then multiplied the abundance metric by 20 to increase the dynamic range, and converted the value to an integer.

For each transcript and each condition, I then multiplied the transcript length by the increased abundance metric to get a relative amount of sequence that a read might come from. For each transcript, I added the transcript to an array for the given condition this amount of times, which we later sample from to count reads. Call these arrays “available hits A” and “available hits B” for each respective condition.

I wanted to create three replicates of each biological condition, labeled “A1″, “A2″, “A3″, “B1″, “B2″, and “B3″. For each replicate, I randomly selected a sample size (total reads) between 12 and 15 million. I then sampled from “available hits A” three times and “available hits B” three times using the randomly selected sample sizes, and counted the number of times a transcript was selected for each. The generated counts data looks like:

simulated_counts

To confirm that the simulation worked correctly, I plotted the log2 of counts A1 versus the log2 of counts B1, by transcript, producing the following graph. It is clear from the plot that roughly 10% of the B1 transcripts are one log2 value behind the corresponding A1 value, and that roughly 10% of the B1 transcripts are one log2 value ahead of the corresponding A1 value, as expected.

log2_counts_per_transcript

Python code implementing the computations described above is appended to the end of this blog post.

Distribution of Transcript Counts Across Replicates

For each transcript where the nominal expression level was the same across both biological conditions, i.e., no doubling or halving of the expression level took place, I calculated the maximum likelihood estimators of the mean and variance of the simulated replicate counts according to a negative binomial distribution. I then plotted the estimated mean versus the estimated variance as follows:

mean_vs_variance_neg_binom

From the image we see that the estimated variance grows faster than the estimated mean–the system is overdispersed–as expected. This indicates that a negative binomial distribution is probably a good model for the replicates. I plan to try to come up with a more formal goodness of fit test to confirm this in the near future, but that will have to wait until a later blog post.

DESeq2 Analysis

I next ran DESeq2 analysis on the two simulated biological samples’ sets of read counts, producing the following output:

R_output

The first block in the above image shows the simulated read counts for transcripts zero through four. We see that transcripts zero and one have “B” expression that is roughly twice that of their “A” expression. We also see that transcript four has “B” expression that is approximately half that of its “A” expression.

The second block in the image above shows that a log2 fold change of one is detected by DESeq2 for transcripts zero and one, and that a log2 fold change of negative one is detected for transcript four. This matches our expectation given the read counts listed in the first block. The p-values for these assessments of fold change are very small, indicating significance.

The third block in the image above shows a summary of all 1,000 transcripts’ results. We see that approximately 10% are upwardly differentially expressed between “A” and “B” and that approximately 10% are downwardly differentially expressed between “A” and “B”. This matches our original simulation design.

R code implementing these calculations follows.

Code

Python code used to produce the simulated counts:

# load useful libraries
from scipy.stats import gamma
import matplotlib.pyplot as plt
import random
import numpy as np

# user settings
number_of_transcripts = 1000
gamma_alpha = 2.7

# set the random seeds for repeatability
random.seed(2)
np.random.seed(seed=2)

# generate gamma-distributed transcript "expression" levels (relative expression, not absolute)
r = gamma.rvs(gamma_alpha, size=number_of_transcripts)
r1_list = r.tolist()
plt.hist(r1_list, color="lightblue")
plt.title("Simulated Distribution of Relative Abundance for 1000 Transcripts")
plt.xlabel("Simulated Relative Abundance")
plt.ylabel("Frequency")
plt.show()

# double 10% and halve 10% of the relative "expression" levels to simulate a different "biological" circumstance or cell
r2_list = []
for r1 in r1_list:
    r2 = r1
    unif = random.uniform(0, 1)
    if unif >= 0.8:
        r2 = r1 / 2.
    if unif >= 0.9:
        r2 = r1 * 2.
    r2_list.append(r2)

# convert the "expression" levels to integers and multiply by 20 to increase dynamic range
r1_int = []
r2_int = []
for r1, r2 in zip(r1_list, r2_list):
    r1_int.append(int(20. * r1))
    r2_int.append(int(20. * r2))
plt.hist(r1_int)
plt.show()
plt.hist(r2_int)
plt.show()

# write the "known" expression levels to disk so that we may compare them to DESeq2 calculations
f = open('data/transcript_lengths.csv')
f_out = open('output/expression.csv', 'w')
f_out.write('transcript,length,expression_A,expression_B\n')
length = []
transcript = []
for i, line in enumerate(f):
    line = line.strip()
    if line.find('transcript') >= 0:  continue
    f_out.write(line + ',' + str(r1_int[i-1]) + ',' + str(r2_int[i-1]) + '\n')
    length.append(int(line.split(',')[1]))
    transcript.append(int(line.split(',')[0]))
f_out.close()
f.close()
plt.hist(length, color="lightblue")
plt.title("Distribution of Lengths for 1000 Selected Transcripts")
plt.xlabel("Length (Nucleotides)")
plt.ylabel("Frequency")
plt.show()

# create populations to sample from that reflect transcript size and relative abundance
r1 = []
for i, j, k in zip(transcript, length, r1_int):
    for n in range(0, j * k):
        r1.append(i)
r2 = []
for i, j, k in zip(transcript, length, r2_int):
    for n in range(0, j * k):
        r2.append(i)

# simulate read hits to transcripts:  generate three replicates for each of the two "biological" samples
samples = {1 : {}, 2: {}}
samples[1]['A'] = random.sample(r1, random.randint(12000000, 15000000))
samples[1]['B'] = random.sample(r1, random.randint(12000000, 15000000))
samples[1]['C'] = random.sample(r1, random.randint(12000000, 15000000))
samples[2]['A'] = random.sample(r2, random.randint(12000000, 15000000))
samples[2]['B'] = random.sample(r2, random.randint(12000000, 15000000))
samples[2]['C'] = random.sample(r2, random.randint(12000000, 15000000))

# count hits to transcripts
counts = {}
for bio in samples.keys():
    if not counts.has_key(bio):  counts[bio] = {}
    for replicate in samples[bio].keys():
        if not counts[bio].has_key(replicate):  counts[bio][replicate] = {}
        for t in samples[bio][replicate]:
            if not counts[bio][replicate].has_key(t):
                counts[bio][replicate][t] = 0
            counts[bio][replicate][t] += 1

# write simulated counts to file
f_out = open('output/counts.csv', 'w')
f_out.write('transcript,length,A1,A2,A3,B1,B2,B3\n')
for i, ln in enumerate(length):
    f_out.write(str(transcript[i]) + ',' + str(ln) + ',' + str(counts[1]['A'][i]) + ',' + str(counts[1]['B'][i]) + ',' + str(counts[1]['C'][i]) + ',' + str(counts[2]['A'][i]) + ',' + str(counts[2]['B'][i]) + ',' + str(counts[2]['C'][i]) + '\n')
f_out.close()

R code used to conduct differential expression analysis:

# the following relies heavily on the text at:
# http://www.bioconductor.org/help/workflows/rnaseqGene/

# clear memory
rm(list=ls())

# load useful libraries
library(DESeq2)
library(gplots)
library(RColorBrewer)
library(MASS)

# load counts data into momory and generate matrix
raw.counts.data <- read.csv("counts.csv")
str(raw.counts.data)
raw.counts.matrix <- as.matrix(raw.counts.data[,c(3:8)])
rownames(raw.counts.matrix) <- raw.counts.data$transcript

# specify how the columns in the matrix are related
coldata <- data.frame(experiment=c("A", "A", "A", "B", "B", "B"))
rownames(coldata) <- c("A1", "A2", "A3", "B1", "B2", "B3")

# create a DESeqDataSet
dds <- DESeqDataSetFromMatrix(countData = raw.counts.matrix,
                                 colData = coldata,
					design = ~ experiment)

# normalize
dds <- estimateSizeFactors(dds)

# plot counts per transcript
plot(log2(1 + counts(dds, normalized=TRUE)[,c(1,4)]), pch=16, cex=0.4, col="blue",
	xlab="Log2 Counts for Sample A1",
	ylab="Log2 Counts for Sample B1",
	main="Counts per Transcript:  Sample A1 vs. Sample B1"
)

# generate the regularized-logarithm transformation
rld <- rlog(dds)

# similarity by clustering
sampleDists <- dist(t(assay(rld)))
sampleDistMatrix <- as.matrix( sampleDists )
rownames(sampleDistMatrix) <- paste( rld$dex, rld$cell, sep="-" )
colors <- colorRampPalette( rev(brewer.pal(9, "Blues")) )(255)
hc <- hclust(sampleDists)
heatmap.2( sampleDistMatrix, Rowv=as.dendrogram(hc),
          symm=TRUE, trace="none", col=colors,
          margins=c(2,10), labCol=FALSE )

# run differential expression analysis
dds <- DESeq(dds)

# view results
res <- results(dds)
res
summary(res)

# test negative Binomial mean vs. variance
MU <- c()
VAR <- c()
for (i in 1:1000) {
    X <- raw.counts.data[i,3:8]
    ratio <- X[1] / X[4]
    if (0.70 <= ratio && ratio <= 1.3) {
        fit <- fitdistr(as.numeric(X), "negative binomial")
        size <- fit$estimate[1]
        mu <- fit$estimate[2]
        variance <- mu + (mu^2) / size
        MU <- c(MU, mu)
        VAR <- c(VAR, variance)
    }
}
plot(MU, VAR, main="Mean vs. Variance of Negative Binomial Fit of Read Counts, per Transcript", xlab="Mean", ylab="Variance", pch=16, cex=0.4, col="blue")
Posted in bioinformatics, engineering, science, statistics | Tagged , , , , , , , , , | Leave a comment

sending an attachment with Amazon SES using Java

Sometimes data scientists need to write software that sends out automated e-mails, and sometimes those e-mails must carry attachments. Here is how to do so in Java using Amazon’s Simple Email Service (SES), which is an inexpensive outbound email service built on Amazon’s cloud infrastructure.

Note: Be sure to log into Amazon SES and verify the recipient domain for your chosen host region before trying to run these steps. In the code below, the host region is set to “us-west-2″.

First, load the necessary libraries:

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Properties;
import javax.mail.Session;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import javax.activation.DataHandler;
import javax.mail.BodyPart;
import javax.mail.MessagingException;
import javax.mail.Multipart;
import javax.mail.internet.MimeBodyPart;
import javax.mail.internet.MimeMultipart;
import javax.mail.util.ByteArrayDataSource;
import com.amazonaws.services.simpleemail.AWSJavaMailTransport;

Next, specify the content of the e-mail and Amazon AWS credentials:

String fileToAttachPath = "/home/username/test.xls";
String fileNameInEmail = "test.xls";
String fileMime = "application/excel";
String from = "email address";
String to = "email address"; 
String body = "This is an e-mail text.";
String bodyMime = "text/plain";
String subject = "This is a subject.";
String accessKeyId = "Your AWS access key ID";
String secretKey = "Your AWS secret key";
String host = "email.us-west-2.amazonaws.com";

Load the file to attach into memory:

byte[] fileContent = null;
File file = new File(fileToAttachPath);
FileInputStream fin = null;
try {
	fin = new FileInputStream(file);
	fileContent = new byte[(int) file.length()];
	fin.read(fileContent);
} catch (FileNotFoundException e) {
	System.out.println("File not found.");
} catch (IOException ioe) {
	System.out.println("Exception reading file.");
} finally {
	try {
		if (fin != null) {
			fin.close();
		}
	} catch (IOException ioe) {
		System.out.println("Error while closing file.");
	}
}

Create a properties object for use by the AWS session:

Properties props = new Properties();
props.setProperty("mail.transport.protocol", "aws");
props.setProperty("mail.aws.user", accessKeyId);
props.setProperty("mail.aws.password", secretKey);
props.setProperty("mail.aws.host", host);

Create the e-mail message and transport:

Session AWSsession = Session.getInstance(props);
AWSJavaMailTransport AWSTransport = new AWSJavaMailTransport(AWSsession, null);
AWSTransport.connect();
MimeMessage message = new MimeMessage(AWSsession);
message.setFrom(new InternetAddress(from));
message.addRecipient(MimeMessage.RecipientType.TO, new InternetAddress(to));
message.setSubject(subject);

Generate the body of the e-mail message:

Multipart multipart = new MimeMultipart();
BodyPart messageBodyPart = new MimeBodyPart();
messageBodyPart.setContent(body, bodyMime);
multipart.addBodyPart(messageBodyPart)

Add the attachment:

messageBodyPart = new MimeBodyPart();
ByteArrayDataSource source = new ByteArrayDataSource(fileContent, fileMime);
messageBodyPart.setDataHandler(new DataHandler(source));
messageBodyPart.setFileName(fileNameInEmail);
multipart.addBodyPart(messageBodyPart);

Finally, send the message:

message.setContent(multipart);
message.saveChanges();
AWSTransport.sendMessage(message, null);
Posted in engineering | Tagged , , , , , , , , , , | Leave a comment

reporting negative results

Two of my recent posts have reported negative results, meaning that no meaningful effects were found during the investigations. Had these investigations been framed as hypothesis tests, we would have failed to reject the null hypotheses. Sounds boring.

However there are good reasons to report these results. The first is that negative results still generate knowledge about a system or process; for instance you might learn that your hypothesized explanatory variables do not explain the variance in the data, which might prompt you to return to the drawing board to look for new explanatory variables. Reporting this progress enables others to try alternative approaches faster by prompting them to avoid strategies you’ve already tried.

Second, suppose a published (in a journal) experimental result is significant at the 5% acceptance level. That means there is a one and twenty chance the null hypothesis was incorrectly rejected based on the data. Reporting additional experiments, whether in publications or on the web in blog posts, helps to mitigate the risk of having one paper’s possibly erroneous conclusion bias a whole section of research. This is vital since many publications only publish positive results, so it is up to alternative channels of communication (like blogging) to complete the picture.

Finally, I use this blog to keep track of where I’ve been in my analyses to help guide where I’m going next for future analyses. Among other things, I treat this blog like an enhanced lab journal, which is why I post code for most of what I do. A good lab journal keeps track of negative results and therefore I do the same.

Scientific inquiry often leads to dead ends; that is part of the game. Reporting such negative outcomes is part of the game as well.

Posted in data science, science, statistics | Tagged , , | Leave a comment

net change of zero between closing and opening stock prices

I decided to investigate the variation between trading days’ closing prices and the following trading days’ opening prices for stocks listed on the New York Stock Exchange. I started with data in the following format for all trading days between January 2nd 2000 and October 30th 2014:

source_data_POST_CROP

I then calculated the percent change between one day’s closing price and the next day’s opening price for each day and each stock symbol in the data set, compiling these values into lists according to opening day. Finally, I plotted the list values in the following box plot:

percent_change_between_close_and_open__NO_FLIERS

In the above plot the outliers are not shown (they will be shown below). The median percent change for each day is approximately zero; statistical significance tests suggested the median was not zero, but for practical purposes it is.

For the sake of completeness, here is the box plot with outliers included:

percent_change_between_close_and_open

Here we see that there are heavy tails on the right side of the distributions. However, for the sake of developing a trading strategy, I’m focusing on the interquartile range centered on zero.

What’s Missing

I visually checked two stocks’ percent price change time series for autocorrelation using autocorrelation plots and found no evidence of autocorrelation. However, this was not a comprehensive analysis covering all the stocks traded in the NYSE. The statistics course I am taking right now will soon teach a hypothesis test for autocorrelation I will use in future analyses.

Code

The following Python code implements the above described calculation:

#
# load useful libraries
#
import datetime
import pandas as pd
import matplotlib.pyplot as plt
from scipy.stats import wilcoxon
from scipy.stats.mstats import kruskalwallis
import numpy as np

#
# load data
#
data = {}
f = open('source_data.csv')
for i, line in enumerate(f):
    line = line.strip()
    symbol = line.split(',')[0]
    date_string = line.split(',')[1]
    open_price = float(line.split(',')[2])
    close_price = float(line.split(',')[5])

    year = int(date_string.split('-')[0])
    month = int(date_string.split('-')[1])
    day = int(date_string.split('-')[2])
    date = datetime.datetime(year, month, day, 0, 0)

    if not data.has_key(symbol):
        data[symbol] = {'date' : [], 'open' : [], 'close' : [], 'weekday' : []}
    data[symbol]['date'].append(date)
    data[symbol]['open'].append(open_price)
    data[symbol]['close'].append(close_price)
    data[symbol]['weekday'].append(date.date().weekday())
f.close()

#
# convert to pandas dataframe, this ensures arrays are aligned by date
#
for symbol in sorted(data.keys()):
    closing_prices = data[symbol]['close']
    opening_prices = data[symbol]['open']
    weekdays = data[symbol]['weekday']
    date = data[symbol]['date']

    pd_closing_prices = pd.Series(closing_prices, index=date)
    pd_opening_prices = pd.Series(opening_prices, index=date)
    pd_weekdays = pd.Series(weekdays, index=date)

    d = {'close' : pd_closing_prices,
         'open' : pd_opening_prices,
         'weekday' : pd_weekdays}
    df = pd.DataFrame(d)

    del(data[symbol]['close'])
    del(data[symbol]['open'])
    del(data[symbol]['date'])
    del(data[symbol]['weekday'])

    data[symbol] = df

#
# compute percent change
#
for symbol in sorted(data.keys()):
    df = data[symbol]
    closing_prices = data[symbol]['close']
    opening_prices = data[symbol]['open']
    weekdays = data[symbol]['weekday']
    date_index = data[symbol].index

    percent_change_list = []
    for i in range(1, len(opening_prices)):
        price_at_open = opening_prices[i]
        price_at_close_day_before = closing_prices[i-1]
        weekday_of_open = weekdays[i]

        percent_change = (price_at_open - price_at_close_day_before) / price_at_close_day_before
        percent_change_list.append(percent_change)

    percent_change_series = pd.Series(percent_change_list, date_index[1:])
    data[symbol]['percent_change'] = percent_change_series

    #
    # test for autocorrelation
    #
    #plt.figure()
    #plt.acorr(percent_change_series, maxlags=30)
    #plt.show()

#
# generate percent change array by weekday for all stocks combined
#
all_stocks = {0 : [], 1 : [], 2 : [], 3 : [], 4 : []}
for symbol in data.keys():
    df = data[symbol]
    percent_change = df['percent_change']
    weekdays = df['weekday']

    for i in range(1, len(percent_change)):
        all_stocks[weekdays[i]].append(percent_change[i])

#
# plot
#
boxplot_data = [all_stocks[0], all_stocks[1], all_stocks[2], all_stocks[3], all_stocks[4]]
plt.figure(figsize=(11, 9))
plt.boxplot(boxplot_data, widths=0.9)
plt.xticks([1, 2, 3, 4, 5], ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'])
plt.xlabel('Opening Weekday')
plt.ylabel('Percent Change')
plt.title('Percent Change from Previous Day\'s Close Price to Open Price, by Opening Weekday')
plt.savefig('percent_change_between_close_and_open.png')
plt.close()

plt.figure(figsize=(11, 9))
plt.boxplot(boxplot_data, widths=0.9, showfliers=False)
plt.xticks([1, 2, 3, 4, 5], ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday'])
plt.xlabel('Opening Weekday')
plt.ylabel('Percent Change')
plt.title('Percent Change from Previous Day\'s Close Price to Open Price, by Opening Weekday')
plt.savefig('percent_change_between_close_and_open__NO_FLIERS.png')
plt.close()

#
# test for median of zero
#
print
for i, days_data in enumerate(boxplot_data):
    print i, wilcoxon(np.array(days_data))

#
# Kruskal-Wallis test for same medians
#
print
print kruskalwallis(np.array(boxplot_data[0]), np.array(boxplot_data[1]), np.arry(boxplot_data[2]), np.array(boxplot_data[3]), np.array(boxplot_data[4]))

#
# print lengths of each array
#
print
for i, days_data in enumerate(boxplot_data):
    print i, len(days_data)
print
Posted in econometrics | Tagged , , , | Leave a comment

fuzzy logic toolkit in C++

I recently came across some old C++ code I wrote about 10 years ago to assist fuzzy logic reasoning. This program is now posted on GitHub at https://github.com/badassdatascience/fuzzy-logic-toolkit and is described below. An example of the tool in action (with code) follows the description.

Fuzzy Logic

Suppose we have a numerical value for distance to a place of interest, and we want to fit it into one of the following categories: “very near”, “near”, “neither near nor far”, “far”, or “very far”. To complicate matters, suppose that experts can’t agree on what values of the distance define membership in each category, so they define functions that estimate the degree of membership in each category for each possible value of the distance. For example, such functions might place a given distance value at 5% “very close”, 20% “close”, and 75% “neither close nor far”. Therefore if one had to choose a category for a given distance they could only specify “fuzzy” levels of membership in each category. This is the crux of fuzzy set theory.

Now suppose that some reasoning based on the selection of the category is required. Perhaps another fuzzy group of categories is involved, such as cost (“cheap”, “modest”, “expensive”). The reasoning could look like boolean logic: e.g, IF “cheap” AND “near” THEN “eat lunch there”. However, the boundaries for “cheap” and “near” are fuzzy and therefore the computation is imprecise. This is fuzzy logic.

The code described below defines C++ classes for describing functions specifying membership in a category, a class for specifying the categories themselves, and a class for compiling the possible categories into a domain of related categories. There is also a class for working with the numerical values of interest (e.g., distance in the case described above), enabling fuzzy boolean operations on those numerical values and estimating “crisp” (non-fuzzy) numerical conclusions from the fuzzy operations.

UML Model

A UML model of the program’s classes is shown below. Each of the classes is described following the image.

UML_model_POST_CROP

Membership Functions

Membership functions are used to define the ranges and computation method with which numerical values are evaluated for degree of membership in a fuzzy category. Each fuzzy category (called a LinguisticSet in the code and UML model) will have a membership function associated with it.

We start with an abstract class called MembershipFunction. From this abstract class we generate four child classes corresponding to different ways of evaluating membership. They are StandardMBF_S, StandardMBF_Z, StandardMBF_Pi, and StandardMBF_Lambda. Objects instantiated from these child classes are associated with a LinguisticSet object during instantiation of the LinguisticSet object.

We now examine one of the membership functions, StandardMBF_Z, in more detail: This is a “Z” function that returns one if a given numerical value is below a minimum value, zero if the test value is above a maximum value, and a linear interpolation between one and zero if the test value falls between the minimum and maximum. An example of what this function looks like is:

MBF_Z

Examples of the other three membership functions are:

MBF_other

Categories (LinguisticSet)

Categories, called “linguistic sets” in the code, encapsulate a membership function and a name. The membership function is used to compute degree of membership of a value in the category. Linguistic sets are combined into LinguistDomain objects according to a common variable, such as distance in the description above.

LinguisticDomain

“Linguistic domains” are collections of categories that are related by a variable, such as distance in the case described above. They might define for instance the set (“very close”, “close”, “neither close nor far”, “far”, “very far”). These are used to relate linguistic set objects by a common variable.

FuzzyValue

Once a linguistic domain is established, a FuzzyValue object can be defined on it that enables fuzzy reasoning using the categories in the domain. This class plays the role of the variable under consideration by the elements of the linguistic domain, and can be set to an exact number if that number is known at the beginning of the computations. For example, one might be executing a fuzzy reasoning procedure involving domains A, B, and C, where the FuzzyValue result for C is computed using known measurements and membership functions for A and B. The example program shown below will illustrate how to use FuzzyValue.

Example Program

The following example comes from Constantin von Altrock’s book “Fuzzy Logic & Neurofuzzy Applications Explained“, which is a very accessible introduction to the subject. It consists of a crane operation where a program must decide through fuzzy reasoning how much power to apply to the crane, given known measurements of distance and angle. This code is also contained in the file “main.cc” that comes with the program when you check it out from GitHub.

The program starts by defining membership functions and linguistic sets for the various options for distance, e.g., “too far” or “medium”. These are then compiled into a linguistic domain for distance. Similarly, membership functions, linguistic sets, and linguistic domains are defined for angle measurements and power values (power values will be computed).

The program then declares values for the known quantities distance (12 yards) and angle (4 degrees). These values are associated with their respective domain upon declaration. It then declares a fuzzy value for power, but does not specify a known measurement, since we are computing it using fuzzy reasoning.

We then apply the following logic: If distance is medium and angle is small positive, apply positive medium power. If distance is medium and angle is zero, apply zero power. Finally, if distance is far and angle is zero, apply positive medium power. Note that “small positive”, “positive medium”, “medium”, “zero”, and “far” are all categories not exact values.

To reason through these categories to choose an outcome, the measured values of distance and angle are each assigned degree of membership to each category in their respective domains. Then the logic is applied through fuzzy reasoning to assign probabilities of power membership in the power categories. Finally, an exact value for power is inferred from the computed probabilities of group membership and reported.

#include "FuzzyValue.hh"
#include <iostream>

int main()
{
  std::cout << std::endl;
  std::cout << "This program demonstrates Dan Williams' yet untitled" << std::endl;
  std::cout << "fuzzy logic toolkit." << std::endl;
  std::cout << std::endl;
  std::cout << "It implements the crane example in Constantin von Altrock's book " << std::endl;
  std::cout << "'Fuzzy Logic & Neurofuzzy Applications Explained'" << std::endl;

  std::cout << std::endl;
  std::cout << "Distance sensor reads 12 yards." << std::endl;
  std::cout << "Angle sensor reads 4 degrees." << std::endl;

  /*
  create and name a linguistic domain for distance
  */
  LinguisticDomain* distance_domain = new LinguisticDomain("distance_domain");

  /*
  define some linguistic values and membership functions for the distance domain
  */

  // too_far
  StandardMBF_Z* distance_domain_too_far_membership = new StandardMBF_Z(-5, 0);
  LinguisticSet* distance_domain_too_far = new LinguisticSet(distance_domain_too_far_membership, "too_far");

  // zero
  StandardMBF_Lambda* distance_domain_zero_membership = new StandardMBF_Lambda(-5, 0, 5);
  LinguisticSet* distance_domain_zero = new LinguisticSet(distance_domain_zero_membership, "zero");

  // close
  StandardMBF_Lambda* distance_domain_close_membership = new StandardMBF_Lambda(0, 5, 10);
  LinguisticSet* distance_domain_close = new LinguisticSet(distance_domain_close_membership, "close");

  // medium
  StandardMBF_Lambda* distance_domain_medium_membership = new StandardMBF_Lambda(5, 10, 30);
  LinguisticSet* distance_domain_medium = new LinguisticSet(distance_domain_medium_membership, "medium");

  // far
  StandardMBF_S* distance_domain_far_membership = new StandardMBF_S(10, 30);
  LinguisticSet* distance_domain_far = new LinguisticSet(distance_domain_far_membership, "far");

  /*
  Add the linguistic values to the distance domain
  */
  distance_domain->addLinguisticSet(distance_domain_too_far);
  distance_domain->addLinguisticSet(distance_domain_zero);
  distance_domain->addLinguisticSet(distance_domain_close);
  distance_domain->addLinguisticSet(distance_domain_medium);
  distance_domain->addLinguisticSet(distance_domain_far);

  /*
  create and name a linguistic domain for angle
  */
  LinguisticDomain* angle_domain = new LinguisticDomain("angle_domain");

  /*
  define some linguistic values and membership functions for the angle domain
  */

  // neg_big
  StandardMBF_Z* angle_domain_neg_big_membership = new StandardMBF_Z(-45, -5);
  LinguisticSet* angle_domain_neg_big = new LinguisticSet(angle_domain_neg_big_membership, "neg_big");

  // neg_small
  StandardMBF_Lambda* angle_domain_neg_small_membership = new StandardMBF_Lambda(-45, -5, 0);
  LinguisticSet* angle_domain_neg_small = new LinguisticSet(angle_domain_neg_small_membership, "neg_small");

  // zero
  StandardMBF_Lambda* angle_domain_zero_membership = new StandardMBF_Lambda(-5, 0, 5);
  LinguisticSet* angle_domain_zero = new LinguisticSet(angle_domain_zero_membership, "zero");

  // pos_small
  StandardMBF_Lambda* angle_domain_pos_small_membership = new StandardMBF_Lambda(0, 5, 45);
  LinguisticSet* angle_domain_pos_small = new LinguisticSet(angle_domain_pos_small_membership, "pos_small");

  // pos_big
  StandardMBF_S* angle_domain_pos_big_membership = new StandardMBF_S(5, 45);
  LinguisticSet* angle_domain_pos_big = new LinguisticSet(angle_domain_pos_big_membership, "pos_big");

  /*
    Add the linguistic values to the angle domain
  */
  angle_domain->addLinguisticSet(angle_domain_neg_big);
  angle_domain->addLinguisticSet(angle_domain_neg_small);
  angle_domain->addLinguisticSet(angle_domain_zero);
  angle_domain->addLinguisticSet(angle_domain_pos_small);
  angle_domain->addLinguisticSet(angle_domain_pos_big);

  /*
  create and name a linguistic domain for power
  */
  LinguisticDomain* power_domain = new LinguisticDomain("power_domain");

  /*
  define some linguistic values and membership functions for the power domain
  */

  // neg_high
  StandardMBF_Lambda* power_domain_neg_high_membership = new StandardMBF_Lambda(-30, -25, -8);
  LinguisticSet* power_domain_neg_high = new LinguisticSet(power_domain_neg_high_membership, "neg_high");

  // neg_medium
  StandardMBF_Lambda* power_domain_neg_medium_membership = new StandardMBF_Lambda(-25, -8, 0);
  LinguisticSet* power_domain_neg_medium = new LinguisticSet(power_domain_neg_medium_membership, "neg_medium");

  // zero
  StandardMBF_Lambda* power_domain_zero_membership = new StandardMBF_Lambda(-8, 0, 8);
  LinguisticSet* power_domain_zero = new LinguisticSet(power_domain_zero_membership, "zero");

  // pos_medium
  StandardMBF_Lambda* power_domain_pos_medium_membership = new StandardMBF_Lambda(0, 8, 25);
  LinguisticSet* power_domain_pos_medium = new LinguisticSet(power_domain_pos_medium_membership, "pos_medium");

  // pos_high
  StandardMBF_Lambda* power_domain_pos_high_membership = new StandardMBF_Lambda(8, 25, 20);
  LinguisticSet* power_domain_pos_high = new LinguisticSet(power_domain_pos_high_membership, "pos");

  /*
  add the linguistic values to the power domain
  */
  power_domain->addLinguisticSet(power_domain_neg_high);
  power_domain->addLinguisticSet(power_domain_neg_medium);
  power_domain->addLinguisticSet(power_domain_zero);
  power_domain->addLinguisticSet(power_domain_pos_medium);
  power_domain->addLinguisticSet(power_domain_pos_high);

  /*
  "Fuzzify" sensor readings
  */
  FuzzyValue* distance = new FuzzyValue(distance_domain);
  distance->setCrispValue(12);
  FuzzyValue* angle = new FuzzyValue(angle_domain);
  angle->setCrispValue(4);

  /*
  Create a fuzzy variable to store power inference calculations
  */
  FuzzyValue* power = new FuzzyValue(power_domain);

  /*
  Fuzzy inference of power value
  */
  power->OR_setSetMembership( distance->AND("medium", angle, "pos_small"), "pos_medium" );
  power->OR_setSetMembership( distance->AND("medium", angle, "zero"), "zero" );
  power->OR_setSetMembership( distance->AND("far", angle, "zero"), "pos_medium" );

  /*
  "Defuzzify" infered power value
  */
  long double power_setting;
  power_setting = power->getCrispValue();
  std::cout << "Set power to " << power_setting << " kW." << std::endl;

  std::cout << std::endl;
  return 0;
}

example_screenshot

Posted in engineering, statistics | Tagged , , , , | Leave a comment

EC2 spot instance price change: no correlation with day of week

My plans for world domination involve heavy use of Amazon EC2 instances, but I have to be frugal about it so I’m running spot instances to save cash. Therefore a means of forecasting spot instance prices would be helpful.

Thus far I’ve had little success using mainstream forecasting tools such as ARIMA and exponential smoothing. So I’m now looking for explanatory variables to use in regressions. I thought I’d start with day of the week as a variable to see if there is a pattern.

I sampled 11 weeks of price changes (code below) from the two US west coast EC2 regions, counting how many times prices went up or down by day of the week the price change occurred on. Doing so produced the following measurements:

table_POST_CROPPED

From these results we see that price change variance goes down on weekends, and that price increases tend to occur slightly more frequently than price decreases on weekdays, though not by much.

I also logged the percent change in price for each price change and produced the following box plot (outliers are not displayed for now since they flood the image):

boxplot_wo_outliers

Again from these results we see that price change variance decreases on weekends. However, overall percent change in price for each price change shows a net of zero for all days. Therefore, I do not think the day of the week will be a useful predictor in a future model of EC2 spot prices. It may be possible to modify forecast confidence intervals based on day of the week due to the decrease in price change variance that occurs on weekends, but I’m not sure yet.

Plotting the box plot with the outliers displayed shows that the distributions are right skewed. I haven’t yet figured out how these extreme values are balanced on the negative side, but there might be some useful information in this occurrence.

boxplot_with_outliers

Code

Here is Python code to fetch the source data from Amazon. Your results will vary as Amazon only provides a certain number of past values depending on when you run the code:

# import useful libraries
import datetime
import os

# user settings
days_to_go_back = 200
access_key = 'your access key'
secret_key = 'your secret key'

# compute the start time
today = datetime.date.today()
delta = datetime.timedelta(days=-days_to_go_back)
start_time = str(today + delta) + 'T00:00:00'

# produce the cmd
cmd = 'ec2-describe-spot-price-history -H --aws-access-key ' + access_key + ' --aws-secret-key ' + secret_key + ' --start-time ' + start_time

# regions
cmd_list = []
regions = ['us-east-1', 'us-west-1', 'us-west-2']
for region in regions:
    cmd_list.append(cmd + ' --region ' + region)

# execute commands
os.system('rm output/AWS_price_history_data.txt')
os.system('touch output/AWS_price_history_data.txt')
for c in cmd_list:
    os.system(c + ' >> output/AWS_price_history_data.txt')

Here is Python code to produce the table and data for the box plots:

# load useful libraries
import datetime
import pytz
from pytz import timezone
import math

# timezones
eastern = timezone('US/Eastern')
pacific = timezone('US/Pacific')

# load data
data = {}
f = open('output/AWS_price_history_data.txt')
for i, line in enumerate(f):
    line = line.strip()
    if line.find('AvailabilityZone') >= 0:  continue
    price = line.split('\t')[1]
    timestamp = line.split('\t')[2]
    instance_type = line.split('\t')[3]
    description = line.split('\t')[4]
    zone = line.split('\t')[5]

    if zone.find('east') >= 0:  continue  # consider only west coast for now since date ranges differ between east and west

    year = int(timestamp.split('T')[0].split('-')[0])
    month = int(timestamp.split('T')[0].split('-')[1])
    day = int(timestamp.split('T')[0].split('-')[2])
    hour = int(timestamp.split('T')[1].split(':')[0])
    minute = int(timestamp.split('T')[1].split(':')[1])
    ts = datetime.datetime(year, month, day, hour, minute, 0, tzinfo=pytz.utc)

    if zone.find('east') >= 0:
        ts = ts.astimezone(eastern)
    if zone.find('west') >= 0:
        ts = ts.astimezone(pacific)

    if not data.has_key(instance_type):
        data[instance_type] = {}
    if not data[instance_type].has_key(description):
        data[instance_type][description] = {}
    if not data[instance_type][description].has_key(zone):
        data[instance_type][description][zone] = {'price' : [], 'timestamp' : []}

    data[instance_type][description][zone]['price'].append(float(price))
    data[instance_type][description][zone]['timestamp'].append(ts)

f.close()

# sort the prices by date
for instance_type in data.keys():
    for description in data[instance_type].keys():
        for zone in data[instance_type][description].keys():

            price_list = data[instance_type][description][zone]['price']
            timestamp_list = data[instance_type][description][zone]['timestamp']

            indices = [i[0] for i in sorted(enumerate(timestamp_list), key=lambda x:x[1])]
            new_timestamp_list = []
            new_price_list = []
            for i in indices:
                new_timestamp_list.append(timestamp_list[i])
                new_price_list.append(price_list[i])

            data[instance_type][description][zone]['price'] = new_price_list
            data[instance_type][description][zone]['timestamp'] = new_timestamp_list

# get days of week
for instance_type in data.keys():
    for description in data[instance_type].keys():
        for zone in data[instance_type][description].keys():
            timestamp_list = data[instance_type][description][zone]['timestamp']
            day_of_week_list = []
            for t in timestamp_list:
                day_of_week_list.append(t.date().weekday())
            data[instance_type][description][zone]['day_of_week_list'] = day_of_week_list

# need to make sure there is an exact number of full weeks in the analysis
for instance_type in data.keys():
    for description in data[instance_type].keys():
        for zone in data[instance_type][description].keys():
            timestamp_list = data[instance_type][description][zone]['timestamp']
            days_diff = (timestamp_list[-1] - timestamp_list[0]).days
            weeks_diff_to_use = int(math.floor(float(days_diff) / 7.)) - 1  # subtracting one allows us to shift the days without error
            data[instance_type][description][zone]['weeks'] = weeks_diff_to_use

            shift_dt = datetime.timedelta(days = 3)  # we are shifting by three days to deal with possible artifacts at beginning of data set
            dt = datetime.timedelta(weeks = weeks_diff_to_use)
            cutoff_time = timestamp_list[0] + dt + shift_dt
            start_cutoff_time = timestamp_list[0] + shift_dt

            for i in range(0, len(timestamp_list)):
                if timestamp_list[i] > cutoff_time:
                    break

            for j in range(0, len(timestamp_list)):
                if timestamp_list[j] > start_cutoff_time:
                    break

            data[instance_type][description][zone]['price'] = data[instance_type][description][zone]['price'][j:i]
            data[instance_type][description][zone]['timestamp'] = data[instance_type][description][zone]['timestamp'][j:i]
            data[instance_type][description][zone]['day_of_week_list'] = data[instance_type][description][zone]['day_of_week_list'][j:i]

# count price rises
price_rises_by_weekday = {0 : 0, 1 : 0, 2 : 0, 3 : 0, 4 : 0, 5 : 0, 6 : 0}
price_falls_by_weekday = {0 : 0, 1 : 0, 2 : 0, 3 : 0, 4 : 0, 5 : 0, 6 : 0}
percent_change_by_weekday = {0 : [], 1 : [], 2 : [], 3 : [], 4 : [], 5 : [], 6 : []}
for instance_type in data.keys():
    for description in data[instance_type].keys():
        for zone in data[instance_type][description].keys():
            price_list = data[instance_type][description][zone]['price']
            day_of_week_list = data[instance_type][description][zone]['day_of_week_list']
            for i in range(1, len(price_list)):
                change_in_price = price_list[i] - price_list[i-1]

                percent_change = change_in_price / price_list[i-1]

                percent_change_by_weekday[day_of_week_list[i]].append(percent_change)

                if change_in_price > 0.00001:
                    price_rises_by_weekday[day_of_week_list[i]] += 1
                if change_in_price < -0.00001:
                    price_falls_by_weekday[day_of_week_list[i]] += 1

# output percent change
weekdays = {0 :'Monday', 1 : 'Tuesday', 2 : 'Wednesday', 3  : 'Thursday', 4 : 'Friday', 5 : 'Saturday', 6 : 'Sunday'}
f = open('output/percent_change.csv', 'w')
f.write('weekday,percent.change\n')
for i in sorted(percent_change_by_weekday.keys()):
    for value in percent_change_by_weekday[i]:
        f.write(weekdays[i] + ',' + str(value) + '\n')
f.close()

# output counts
for i in sorted(price_rises_by_weekday.keys()):
    print weekdays[i] + ':  ' + str(price_rises_by_weekday[i])
print
for i in sorted(price_falls_by_weekday.keys()):
    print weekdays[i] + ':  ' + str(price_falls_by_weekday[i])
print
for i in sorted(price_falls_by_weekday.keys()):
    print weekdays[i] + ':  ' + str(price_rises_by_weekday[i] - price_falls_by_weekday[i])

Here is the R code used to make the plots:

# load the data
data <- read.csv("percent_change.csv")
str(data)

# reorder the factors
data$y = factor(data$weekday, levels(data$weekday)1)

# boxplot with no outliers
boxplot(percent.change ~ y, data=data, outline=FALSE, main="Percent Change in Spot Instance Price for all Changes in 11-Week Period", ylab="Percent Change")

# boxplot with outliers
boxplot(percent.change ~ y, data=data, main="Percent Change in Spot Instance Price for all Changes in 11-Week Period", ylab="Percent Change")
Posted in engineering | Tagged , , , , , , , | Leave a comment

Apache Spark and stock price causality

The Challenge

I wanted to compute Granger causality (described below) for each pair of stocks listed in the New York Stock Exchange. Moreover, I wanted to analyze between one and thirty lags for each pair’s comparison. Needless to say, this requires massive computing power. I used Amazon EC2 as the computing platform, but needed a smooth way to architect the computation and parallelize it. Therefore I turned to Apache Spark (also described below). Code implementing the computation is included at the bottom of this text.

Granger Causality

Granger causality provides a statistical measure of how much information in one time series predicts the information in another. It works by examining the correlation between the two series where the expected predictor series is lagged by a specified number of time points. The result is a test statistic indicating the degree of Granger causality. Note that this is not a measure of true causality, as both time series might be actually caused by a third process.

Consider for example the daily change in closing prices of stocks BQH and WMB. Using the Python StatsModels package’s Granger test procedure (code below), I computed a p-value of 2.205e-26 for the null hypothesis that the daily change in BQH’s closing price does not Granger cause the daily change in WMB’s closing price at five lags. I concluded therefore that change in BQH does Granger cause change in WMB by five trading days. I then plotted the two normalized changes in price (lagged for BQH by five days) for 20 trading days. In the plot below one can see that the WMB price and the five-day lagged BQH price moves in the same direction for 14 of the 20 days, indicating that change in BQH closing price might be a useful predictor of future change in WMB closing price.

BQH_WMB_comparison

Apache Spark

Apache Spark is the new “it thing” in data science these days, so I thought it a good idea to learn it. Essentially it generalizes MapReduce, running much faster than Hadoop, and it provides Java, Scala, and Python interfaces. It is not limited to the two-step MapReduce procedure, easily allowing multiple map and reduce operations to be chained together, along with other useful data processing steps such as set unions, sorting, and generation of Cartesian products. The program largely takes care of parallelization during these steps, though offers some opportunity for fine tuning.

Apache Spark can be run on a local machine using multiple cores or on a cluster, but I’ve had more luck with parallelization using a single-worker cluster than using a local machine. The learning curve was comparable to that for Hadoop; writing applications that use the paradigm is quite easy once you “get it”. Map and reduce operations are specified as functions in one of the three languages listed above, and are parallelized by the various map and reduce commands provided by Spark. Variables can be shared across the parallel processes to aid procedures requiring storage of global values.

The Source Data

I started with daily closing stock prices for each trading day between January 1, 2000 and October 30, 2014, which I pulled from Yahoo Finance using Python’s Pandas package. I placed this data in one CSV file with the following format:

source_data_POST_CROP

The Code

Here is the Python code, with explanation, necessary to run the pairwise Granger causality computation in Apache Spark.

First, we import the necessary libraries:

from pyspark import SparkContext, SparkConf
import datetime
import pandas as pd
from statsmodels.tsa import stattools
import numpy as np

We next declare a SparkContext object using with configuration settings. The “setMaster” argument is specific to the cluster you create. The ports listed in the port arguments must be open on the cluster instances, a “gotcha” on EC2-based clusters especially:

conf = (SparkConf().setMaster("spark://ip-123-123-123-123:7077").setAppName("spark test").set('spark.executor.memory', '3g').set('spark.driver.port', 53442).set('spark.blockManager.port', 54321).set('spark.executor.port', 12345).set('spark.broadcast.port', 22222).set('spark.fileserver.port', 33333))
sc = SparkContext(conf = conf)

We then read the source data into a Spark data object:

combined_file = sc.textFile('/home/ec2-user/data.csv')

Next, we extract from each line the stock symbol as a string, the date as a datetime object, and the closing price as a floating point number

close = combined_file.map(lambda line: ((line.split(',')[0], datetime.datetime(int(line.split(',')[1].split('-')[0]), int(line.split(',')[1].split('-')[1]), int(line.split(',')[1].split('-')[2]))), float(line.split(',')[5])))

We reorganize the last dataset as a key value set with the symbol as the key. We could have combined this with the last step, but the process is fast and I didn’t want to break working code.

close_by_symbol = close.map(lambda a: (a[0][0], (a[0][1], a[1])))

We now group each entry by stock symbol. This produces a key-value set were each key is a stock symbol and each value is an iterator containing a list of (date, price) tuples.

close_grouped_by_symbol = close_by_symbol.groupByKey()

We define a function to compute the one-day difference in closing prices for each symbol. This is because we are conducting the Granger causality analysis on the daily price differences rather than the prices themselves.

def make_diff(a):
    datelist = []
    pricelist = []
    for i in a[1]:
        datelist.append(i[0])
        pricelist.append(i[1])
    sorted_indices = [x[0] for x in sorted(enumerate(datelist), key=lambda q:q[1])]
    datelist_sorted = []
    pricelist_sorted = []
    for i in sorted_indices:
        datelist_sorted.append(datelist[i])
        pricelist_sorted.append(pricelist[i])
    price_diff = []
    diff_date = []
    for i in range(1, len(pricelist_sorted)):
        price_diff.append(pricelist_sorted[i] - pricelist_sorted[i-1])
        diff_date.append(datelist_sorted[i])
    return a[0], zip(diff_date, price_diff)

Next, we execute a map operation using the function defined above to produce a dataset containing the one-day difference values:

close_diff_by_symbol = close_grouped_by_symbol.map(make_diff)

We have key-value pairs where the keys are stock symbols and the values are lists of (date, price difference) tuples. We now want the Cartesian product of this data set against itself, so that every stock symbol’s price difference data (with dates) is matched to every other stock symbol’s price difference data. Apache Spark really shines at this task, providing the “cartesian” function for the purpose:

cartesian_matched = close_diff_by_symbol.cartesian(close_diff_by_symbol)

We define a function that will match the price difference values for each symbol pair by date. This returns a tuple (symbol 1, symbol 2, price difference list 1, price difference list 2), where the lists are aligned by trading date:

def match(a):
    case_1 = a[0]
    case_2 = a[1]
    symbol1 = case_1[0]
    symbol2 = case_2[0]
    ts1 = case_1[1]
    ts2 = case_2[1]
    time1 = []
    price1 = []
    for i, j in ts1:
        time1.append(i)
        price1.append(j)
    time2 = []
    price2 = []
    for i, j in ts2:
        time2.append(i)
        price2.append(j)
    series_1 = pd.Series(price1, index=time1)
    series_2 = pd.Series(price2, index=time2)
    df_1 = pd.DataFrame(series_1, columns=['1'])
    df_2 = pd.DataFrame(series_2, columns=['2'])
    merged = df_1.join(df_2, how='inner')
    return (symbol1, symbol2, merged['1'].tolist(), merged['2'].tolist())

We then execute this function in a map operation:

matched = cartesian_matched.map(match)

We can now define our function to compute Granger causality from the date-matched pairs of price differences for each stock symbol pair. Lag values are used as keys in the dictionary:

def gc(a):
    m = 30
    s1 = a[0]
    s2 = a[1]
    list1 = a[2]
    list2 = a[3]
    test_array = np.array([list1, list2], np.int64)
    p_ssr_ftest_dict = {}
    try:
        gc = stattools.grangercausalitytests(test_array.T, m, verbose=False)
        for q in gc.keys():
            p_ssr_ftest_dict[q] = gc[q][0]['ssr_ftest'][1]
    except:
        for q in range(1, m+1):
            p_ssr_ftest_dict[q] = None
    return (s1, s2, p_ssr_ftest_dict)

We execute this function in a map operation:

symbol_pair_gc = matched.map(gc)

We define a function for converting the results to CSV format:

def report(a):
    s1 = a[0]
    s2 = a[1]
    fdict = a[2]
    report_list = []
    for q in sorted(fdict.keys()):
        report_list.append(str(fdict[q]))
    report_string = s1 + ',' + s2 + ',' + ','.join(report_list)
    return report_string

We execute a map operation using this function to create CSV output:

report_csv = symbol_pair_gc.map(report)

Finally, we save the results to a file. Note that when running on a cluster, for the path given below, a path “/home/ec2-user/output/gc_output/_temporary” must exist on the worker nodes before starting the program. “/home/ec2-user/output/gc_output” must not exist on the head node at start time.

report_csv.saveAsTextFile('/home/ec2-user/output/gc_output')

The Output

The program generates Granger causality test p-values for each stock symbol pair for lags one through 30, e.g.,

output_data_POST_CROP

Conclusion

There you have it; a way to compute pairwise Granger causality for each stock pair in the New York Stock Exchange using Apache Spark. I’ll let you know if an investment strategy emerges from this work.

Posted in big data, data science, econometrics, statistics | Tagged , , , , , , , , , , , , , | Leave a comment

hacking the stock market (part 1)

Caveat: I am not a technical investor–just a hobbyist, so take this analysis with a grain of salt. I am also just beginning with my Master’s work in statistics.

I wanted to examine the correlation between changes in the daily closing price of the Dow Jones Industrial Average (DJIA) and lags of those changes, to see if there is a pattern I could use. First I downloaded the DJIA data from Yahoo using Pandas:

#
# load useful libraries
#
from pandas.io.data import DataReader
from datetime import datetime
import matplotlib.pyplot as plt
import numpy as np
import math
from scipy.stats.stats import pearsonr
from scipy.stats.stats import spearmanr

#
# load DJIA data from Yahoo server
#
djia = DataReader("DJIA",  "yahoo", datetime(2000,1,1), datetime.today())

I then generated the autocorrelation plots for the one-day differenced closing prices and the signs of the one-day differenced closing prices:

#
# investigate the diff between DJIA closing prices
#
diff_1st_order = djia["Adj Close"].diff()
diff_1st_order_as_list = []
for d in diff_1st_order:
    if not np.isnan(d):
        diff_1st_order_as_list.append(d)
plt.subplot(2, 1, 1)
plt.acorr(diff_1st_order_as_list, maxlags=10)
plt.title("Autocorrelation of Diff of DJIA Adjusted Close")
plt.xlabel("Lag")
plt.ylabel("Correlation")

#
# sign of diff, not diff itself
#
diff_1st_order_sign = []
for d in diff_1st_order_as_list:
    if not np.isnan(d / abs(d)):
        diff_1st_order_sign.append(d / abs(d))
    else:
        diff_1st_order_sign.append(0)
plt.subplot(2, 1, 2)
plt.acorr(diff_1st_order_sign, maxlags=10)
plt.title("Autocorrelation of Sign of Diff of DJIA Adjusted Close")
plt.xlabel("Lag")
plt.ylabel("Correlation")

first_look_at_DJIA

There is a very small negative correlation between the one-day closing price difference and the one-day lag of the one-day closing price difference. Similarly, there is an even smaller positive correlation between the one-day closing price difference and the three-day lag of the one-day closing price difference.

So I set out to find the proportion of times the difference and the one-day lag of the difference changes from day to day:

#
# frequencies of 1-day lag changes in direction of closing price
#
count_opposite = 0
count_same = 0
i_list = []
j_list = []
for i in range(0, len(diff_1st_order_as_list) - 1):
    price_diff_i = diff_1st_order_as_list[i]
    price_diff_j = diff_1st_order_as_list[i+1]  # one trading day ahead

    i_list.append(price_diff_i)
    j_list.append(price_diff_j)

    sign_of_price_diff_i = 0
    if not np.isnan(price_diff_i / abs(price_diff_i)):
        sign_of_price_diff_i = int(price_diff_i / abs(price_diff_i))

    sign_of_price_diff_j = 0
    if not np.isnan(price_diff_j / abs(price_diff_j)):
        sign_of_price_diff_j = int(price_diff_j / abs(price_diff_j))

    if sign_of_price_diff_i == sign_of_price_diff_j:
        count_same += 1
    else:
        count_opposite += 1

print
print 'Correlation coefficients for the diff lists:'
print '\t', 'Pearson R: ', pearsonr(i_list, j_list)[0]
print '\t', 'Spearman R: ', spearmanr(i_list, j_list)[0]
print

print 'Amount of time closing value direction remains the same: ', round(float(count_same) / (float(count_same) + float(count_opposite)), 3)
amount_time_changes = float(count_opposite) / (float(count_same) + float(count_opposite))
print 'Amount of time closing value direction changes: ', round(amount_time_changes, 3)
L = amount_time_changes - 1.959964*((math.sqrt(amount_time_changes*(1.0 - amount_time_changes)))/math.sqrt(float(len(diff_1st_order_as_list))))
U = amount_time_changes + 1.959964*((math.sqrt(amount_time_changes*(1.0 - amount_time_changes)))/math.sqrt(float(len(diff_1st_order_as_list))))
print 'Agresti-Coull C.I.: ', round(L, 3), '< p <', round(U, 3)
print

first_look_at_DJIA_OUTPUT_1

This analysis tells me that in the long run (at least over the period that I pulled DJIA data for), betting using one-day changes of direction of the closing price of the DJIA would slowly pay off. (However, we are ignoring the magnitudes of the changes in this analysis; the magnitudes may be insufficient to be worth the price of a trade. A future analysis will investigate this). The test for correlation between the two time-series (one-day difference and lag of one-day difference) shows that the Agresti-Coull confidence interval is appropriate (we have near independence), although this betting scheme relies on the thinest correlation detected by the autocorrelation plot.

There was another possible pattern in the autocorrelation plot above: a three-day lag positive correlation and a one-lag negative correlation. I decided to check out the proportion of times using the combination of the two would result in a prediction success greater than the null of 25% for the case that the three-day lag changes direction in opposite direction as the one-day lag and in the same direction as the zero-day value:

#
# frequencies of combination of 1-day lag and 3-day lag changes in direction
# of closing price
#
count_matches = 0
count_non_matches = 0
i_list = []
j_list = []
k_list = []
for i in range(0, len(diff_1st_order_as_list) - 3):
    price_diff_i = diff_1st_order_as_list[i]
    price_diff_j = diff_1st_order_as_list[i+1]  # one trading day ahead
    price_diff_k = diff_1st_order_as_list[i+3]  # three trading days ahead

    i_list.append(price_diff_i)
    j_list.append(price_diff_j)
    k_list.append(price_diff_k)

    # price_diff_i represents 3-day lag
    sign_of_price_diff_i = 0
    if not np.isnan(price_diff_i / abs(price_diff_i)):
        sign_of_price_diff_i = int(price_diff_i / abs(price_diff_i))
    sign_of_price_diff_j = 0

    # price_diff_j represents 1-day lag
    if not np.isnan(price_diff_j / abs(price_diff_j)):
        sign_of_price_diff_j = int(price_diff_j / abs(price_diff_j))
    sign_of_price_diff_k = 0

    # price_diff_k represents current day
    if not np.isnan(price_diff_k / abs(price_diff_k)):
        sign_of_price_diff_k = int(price_diff_k / abs(price_diff_k))

    if sign_of_price_diff_k != sign_of_price_diff_j and sign_of_price_diff_k == sign_of_price_diff_i:
        count_matches += 1
    else:
        count_non_matches += 1

print 'Correlation coefficients for the diff lists:'
print '\t', 'Pearson R: ', pearsonr(i_list, j_list)[0]
print '\t', 'Spearman R: ', spearmanr(i_list, j_list)[0]
print '\t', 'Pearson R: ', pearsonr(i_list, k_list)[0]
print '\t', 'Spearman R: ', spearmanr(i_list, k_list)[0]
print '\t', 'Pearson R: ', pearsonr(j_list, k_list)[0]
print '\t', 'Spearman R: ', spearmanr(j_list, k_list)[0]
print

amount_time_changes = float(count_matches) / (float(count_matches) + float(count_non_matches))
print 'Amount of time 1-day change is opposite direction and 3-day change is same direction: ', round(amount_time_changes, 3)
L = amount_time_changes - 1.959964*((math.sqrt(amount_time_changes*(1.0 - amount_time_changes)))/math.sqrt(float(len(diff_1st_order_as_list))))
U = amount_time_changes + 1.959964*((math.sqrt(amount_time_changes*(1.0 - amount_time_changes)))/math.sqrt(float(len(diff_1st_order_as_list))))
print 'Agresti-Coull C.I.: ', round(L, 3), '< p <', round(U, 3)
print

first_look_at_DJIA_OUTPUT_2

To complete the code, we need to show the plot:

#
# show the plot
#
plt.show()

I’m not certain the narrow margin of opportunity detected by this analysis is sufficient for the development of a trading strategy. More investigation is needed.

Posted in econometrics, statistics | Tagged , , , , , , , , , , | Leave a comment

Kaplan-Meier estimator in Python

The following Python class computes and draws Kaplan-Meier product limit estimators for given data. An example of how to use the class follows the code.

Code

# load useful libraries
import matplotlib.pyplot as plt

#
# class for building Kaplan-Meier product limit estimator
#
class KM(object):

    #
    # constructor
    #
    def __init__(self, measured_values, censored_or_not):
        self.measured_values = measured_values;
        self.censored_or_not = censored_or_not

        # log measured values where event occurred
        self.event_occurred_times = {}
        self.censored_occurred_times = {}
        for i, mv in enumerate(self.measured_values):
            if self.censored_or_not[i] == 1:
                if not self.event_occurred_times.has_key(mv):
                    self.event_occurred_times[mv] = 0
                self.event_occurred_times[mv] += 1
            else:
                if not self.censored_occurred_times.has_key(mv):
                    self.censored_occurred_times[mv] = 0
                self.censored_occurred_times[mv] += 1

        # construct list of j values
        j_list = [0]
        j_list.extend(sorted(self.event_occurred_times.keys()))
        j_list.append(max(max(self.event_occurred_times.keys()), max(self.censored_occurred_times.keys())) + 1)
        self.j_list = j_list

        # count censored values in interval [j, j+1), index by j
        self.number_of_units_censored_in_interval_j_to_j_plus_one = {}
        for i in range(0, len(j_list)-1):
            j = j_list[i]
            j_plus_one = j_list[i+1]
            m_count = 0
            for m in self.censored_occurred_times.keys():
                if j <= m and m < j_plus_one:
                    m_count = m_count + self.censored_occurred_times[m]
            self.number_of_units_censored_in_interval_j_to_j_plus_one[j] = m_count

        # calculate number of units at risk just prior to time t_j
        self.number_of_units_at_risk_just_prior_to_time_j = {}
        for i in range(0, len(j_list)-1):
            j = j_list[i]
            n_count = 0.
            for k in range(i, len(j_list)-1):
                jk = j_list[k]
                if self.event_occurred_times.has_key(jk):
                    n_count = n_count + self.event_occurred_times[jk]
                if self.number_of_units_censored_in_interval_j_to_j_plus_one.has_key(jk):
                    n_count = n_count + self.number_of_units_censored_in_interval_j_to_j_plus_one[jk]
            self.number_of_units_at_risk_just_prior_to_time_j[j] = n_count

        # add time zero count to self.event_occurred_times
        self.event_occurred_times[0] = 0

        # build the estimator for each time j
        self.S = {}
        for i in range(0, len(j_list)-1):
            j = j_list[i]
            prod = 1.
            for k in range(0, i+1):
                jk = j_list[k]
                prod = prod * ((self.number_of_units_at_risk_just_prior_to_time_j[jk] - self.event_occurred_times[jk]) / self.number_of_units_at_risk_just_prior_to_time_j[jk])
            self.S[j] = prod

    #
    # display the estimator in the console
    #
    def display(self):

        print
        print '\ttime\tn.risk\tn.event\tsurvival'
        for i in range(1, len(self.j_list) - 1):
            j = self.j_list[i]
            print '\t' + str(j) + '\t' + str(int(self.number_of_units_at_risk_just_prior_to_time_j[j])) + '\t' + str(self.event_occurred_times[j]) + '\t' + str(round(self.S[j], 3))
        print

    #
    # plot
    #
    def plot(self, color, xlabel, title):
        j_list_sans_end = self.j_list[0:-1]

        # plot the curve
        plt.figure()
        for i in range(0, len(j_list_sans_end) - 1):
            j = j_list_sans_end[i]
            j_plus_one = j_list_sans_end[i+1]
            plt.plot([j, j_plus_one], [self.S[j], self.S[j]], color=color)
            plt.plot([j_plus_one, j_plus_one], [self.S[j], self.S[j_plus_one]], color=color)

        # set the axis limits
        plt.xlim([0, self.j_list[-1]])
        plt.ylim([-0.05, 1.05])

        # add the censored cases within the curve
        for i in sorted(self.censored_occurred_times.keys()):
            last_S = 1.
            for j in sorted(self.S.keys()):
                if j >= i:
                    plt.scatter(i, last_S, color=color)
                    break
                last_S = self.S[j]

        # add the censored cases beyond the curve
        for i in sorted(self.censored_occurred_times.keys()):
            max_S_key = max(self.S.keys())
            if i > max_S_key:
                plt.scatter(i, self.S[max_S_key], color="blue")

        # show the plot
        plt.ylabel("Survival")
        plt.xlabel(xlabel)
        plt.title(title)
        plt.show()

Usage Example

import matplotlib.pyplot as plt
times = [3, 4, 4, 5, 7, 6, 8, 8, 12, 14, 14, 19, 20, 20]
events = [1, 1, 1, 0, 1, 0, 1, 1, 1, 1, 1, 1, 0, 0]  # 0 for censored cases, 1 for event occurred
my_km = KM(times, events)
my_km.display()
my_km.plot("blue", "Time", "Kaplan-Meier Product Limit Estimator Example")

python_kaplan-meier-console

python_kaplan_meier_demo

Posted in data science, engineering, science, statistics | Tagged , , , , | Leave a comment

setting up a Nexus OSS server on an Amazon EC2 instance

It took me a while to figure out how to set up a Nexus OSS server on an Amazon EC2 instance, so I’m writing down the instructions here in case they prove useful to anyone in the future.

In this case we are also changing the default Nexus OSS server port of 8081 to port 80 due to a firewall issue.

Start up an Amazon Linux EC2 instance with incoming port 22 set to “My IP” and incoming port 80 set to “Anywhere”. For the sake of these instructions, we assume the IP address of the new EC2 instance is xxx.xxx.xxx.xxx.

Log into the server using a terminal:

ssh -i your_key.pem ec2-user@xxx.xxx.xxx.xxx

Execute the following commands:

sudo yum update
sudo yum install emacs
wget http://download.sonatype.com/nexus/oss/nexus-latest-bundle.tar.gz
sudo cp nexus-latest-bundle.tar.gz /usr/local
cd /usr/local
sudo tar -xvzf nexus-latest-bundle.tar.gz
sudo rm nexus-latest-bundle.tar.gz 
sudo ln -s nexus-2.9.2-01 nexus
sudo chown -R ec2-user:ec2-user nexus
sudo chown -R ec2-user:ec2-user nexus-2.9.2-01
sudo chown -R ec2-user:ec2-user sonatype-work

Edit the Nexus server executable, changing the value of “RUN_AS_USER” to “root”:

emacs nexus/bin/nexus

Set a root password, which we’ll need later:

sudo su - root
passwd
exit

Enter the “nexus” directory and edit the conf/nexus.properties file to set the “application-port” to “80”.

cd nexus
emacs conf/nexus.properties

Start the server. Enter the root password when prompted:

./bin/nexus start

Wait about a minute or two for the server to fire up, and then point a web browser to http://xxx.xxx.xxx.xxx/nexus to use the server.

nexus_webapp_screenshot

Posted in engineering | Tagged , , , , , , , | Leave a comment