ContingencyTable.get_column_total()   A
last analyzed

Complexity

Conditions 2

Size

Total Lines 6

Duplication

Lines 0
Ratio 0 %

Importance

Changes 0
Metric Value
cc 2
c 0
b 0
f 0
dl 0
loc 6
rs 9.4285
1
import math
2
3
from pysie.dsl.set import TernarySearchSet, TernarySearchTrie
4
from pysie.stats.distributions import MeanSamplingDistribution
5
from pysie.stats.samples import SampleDistribution
6
7
from scipy.stats import f, chi2
8
9
10
class ContingencyTable(object):
11
    values = None
12
    rows = None
13
    columns = None
14
15
    def __init__(self):
16
        self.rows = TernarySearchSet()
17
        self.columns = TernarySearchSet()
18
        self.values = TernarySearchTrie()
19
20
    def set_cell(self, row_name, column_name, value):
21
        key = self.make_key(row_name, column_name)
22
        self.values.put(key, value)
23
        self.rows.add(row_name)
24
        self.columns.add(column_name)
25
26
    def get_cell(self, row_name, column_name):
27
        key = self.make_key(row_name, column_name)
28
        if not self.values.contains_key(key):
29
            return 0
30
        return self.values.get(key)
31
32
    def make_key(self, row_name, column_name):
33
        return row_name + '-' + column_name
34
35
    def get_row_total(self, row_name):
36
        column_names = self.columns.to_array()
37
        result = 0
38
        for x in column_names:
39
            result += self.get_cell(row_name, x)
40
        return result
41
42
    def get_column_total(self, column_name):
43
        row_names = self.rows.to_array()
44
        result = 0
45
        for x in row_names:
46
            result += self.get_cell(x, column_name)
47
        return result
48
49
    def get_total(self):
50
        values = self.values.values()
51
        result = 0
52
        for val in values:
53
            result += val
54
        return result
55
56
57
class Anova(object):
58
    sample = None
59
    individual_samples = None
60
    individual_sample_distributions = None
61
    individual_sampling_distributions = None
62
    overall_sample_distribution = None
63
    overall_sampling_distribution = None
64
65
    sum_of_squares_total = None
66
    sum_of_squares_group = None
67
    sum_of_squares_error = None
68
69
    df_group = None
70
    df_error = None
71
    df_total = None
72
73
    mean_square_group = None
74
    mean_square_error = None
75
76
    F = None
77
    p_value = None
78
79
    significance_level = None
80
    reject_mean_same = None
81
82
    def __init__(self, sample, significance_level=None):
83
        if significance_level is not None:
84
            self.significance_level = significance_level
85
86
        self.sample = sample
87
        self.individual_sampling_distributions = TernarySearchTrie()
88
        self.individual_sample_distributions = TernarySearchTrie()
89
        self.individual_samples = sample.split_by_group_id()
90
        for group_id in self.individual_samples.keys():
91
            sample_distribution = SampleDistribution(sample=self.individual_samples.get(group_id), group_id=group_id)
92
            sampling_distribution = MeanSamplingDistribution(sample_distribution=sample_distribution)
93
            self.individual_sample_distributions.put(group_id, sample_distribution)
94
            self.individual_sampling_distributions.put(group_id, sampling_distribution)
95
96
        self.overall_sample_distribution = SampleDistribution(sample=sample, group_id=None)
97
        self.overall_sampling_distribution = MeanSamplingDistribution(self.overall_sample_distribution)
98
        self.build()
99
100
    def build(self):
101
        self.sum_of_squares_total = self.overall_sample_distribution.sum_of_squares
102
        self.sum_of_squares_group = 0
103
        mean_overall = self.overall_sample_distribution.mean
104
        for sample_distribution_i in self.individual_sample_distributions.values():
105
            mean_i = sample_distribution_i.mean
106
            self.sum_of_squares_group += math.pow(mean_i - mean_overall, 2.0) * sample_distribution_i.sample_size
107
        self.sum_of_squares_error = self.sum_of_squares_total - self.sum_of_squares_group
108
109
        self.df_total = self.sample.size() - 1
110
        self.df_group = self.individual_samples.size() - 1
111
        self.df_error = self.df_total - self.df_group
112
113
        self.mean_square_error = self.sum_of_squares_error / self.df_error
114
        self.mean_square_group = self.sum_of_squares_group / self.df_group
115
116
        self.F = self.mean_square_group / self.mean_square_error
117
        self.p_value = 1 - f.cdf(self.F, self.df_group, self.df_error)
118
119
        if self.significance_level is not None:
120
            self.reject_mean_same = self.p_value >= self.significance_level
121
122
    def will_reject(self, significance_level):
123
124
        return self.p_value < significance_level
125
126
127
class ChiSquare(object):
128
    chiSq = None
129
    sample = None
130
    p_value = None
131
    df = None
132
    significance_level = None
133
134
    def __init__(self, sample, significance_level=None):
135
136
        self.sample = sample
137
        self.significance_level = significance_level
138
139
        table = ContingencyTable()
140
        for i in range(sample.size()):
141
            row = sample.get(i)
142
            row_name = row.label
143
            column_name = row.group_id
144
            table.set_cell(row_name, column_name, table.get_cell(row_name, column_name) + 1)
145
146
        total = table.get_total()
147
        self.chiSq = 0
148
        for row in table.rows.to_array():
149
            for column in table.columns.to_array():
150
                expected = table.get_row_total(row) * table.get_column_total(column) / total
151
                observed = table.get_cell(row, column)
152
                self.chiSq += math.pow(observed - expected, 2) / expected
153
154
        self.df = (table.rows.size() - 1) * (table.columns.size() - 1)
155
156
        self.p_value = 1 - chi2.cdf(self.chiSq, self.df)
157
158
        if self.significance_level is not None:
159
            self.reject_mean_same = self.p_value >= self.significance_level
160
161
    def will_reject(self, significance_level):
162
        return self.p_value < significance_level
163