Completed
Pull Request — master (#906)
by Eddie
01:40
created

zipline.data.BcolzDailyBarReader.history_window()   A

Complexity

Conditions 2

Size

Total Lines 8

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 2
dl 0
loc 8
rs 9.4286
1
# Copyright 2015 Quantopian, Inc.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
from abc import (
15
    ABCMeta,
16
    abstractmethod,
17
)
18
from errno import ENOENT
19
from os import remove
20
from os.path import exists
21
import sqlite3
22
23
from bcolz import (
24
    carray,
25
    ctable,
26
)
27
from collections import namedtuple
28
from click import progressbar
29
from numpy import (
30
    array,
31
    int64,
32
    float64,
33
    floating,
34
    full,
35
    iinfo,
36
    integer,
37
    issubdtype,
38
    nan,
39
    uint32,
40
)
41
from pandas import (
42
    DataFrame,
43
    DatetimeIndex,
44
    read_csv,
45
    Timestamp,
46
)
47
from six import (
48
    iteritems,
49
    string_types,
50
    with_metaclass,
51
)
52
53
from ._equities import _compute_row_slices, _read_bcolz_data
54
from ._adjustments import load_adjustments_from_sqlite
55
56
import logbook
57
logger = logbook.Logger('UsEquityPricing')
58
59
OHLC = frozenset(['open', 'high', 'low', 'close'])
60
US_EQUITY_PRICING_BCOLZ_COLUMNS = [
61
    'open', 'high', 'low', 'close', 'volume', 'day', 'id'
62
]
63
SQLITE_ADJUSTMENT_COLUMNS = frozenset(['effective_date', 'ratio', 'sid'])
64
SQLITE_ADJUSTMENT_COLUMN_DTYPES = {
65
    'effective_date': integer,
66
    'ratio': floating,
67
    'sid': integer,
68
}
69
SQLITE_ADJUSTMENT_TABLENAMES = frozenset(['splits', 'dividends', 'mergers'])
70
71
72
SQLITE_DIVIDEND_PAYOUT_COLUMNS = frozenset(
73
    ['sid',
74
     'ex_date',
75
     'declared_date',
76
     'pay_date',
77
     'record_date',
78
     'amount'])
79
SQLITE_DIVIDEND_PAYOUT_COLUMN_DTYPES = {
80
    'sid': integer,
81
    'ex_date': integer,
82
    'declared_date': integer,
83
    'record_date': integer,
84
    'pay_date': integer,
85
    'amount': float,
86
}
87
88
89
SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMNS = frozenset(
90
    ['sid',
91
     'ex_date',
92
     'declared_date',
93
     'record_date',
94
     'pay_date',
95
     'payment_sid',
96
     'ratio'])
97
SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMN_DTYPES = {
98
    'sid': integer,
99
    'ex_date': integer,
100
    'declared_date': integer,
101
    'record_date': integer,
102
    'pay_date': integer,
103
    'payment_sid': integer,
104
    'ratio': float,
105
}
106
UINT32_MAX = iinfo(uint32).max
107
108
109
class NoDataOnDate(Exception):
110
    """
111
    Raised when a spot price can be found for the sid and date.
112
    """
113
    pass
114
115
116
class BcolzDailyBarWriter(with_metaclass(ABCMeta)):
117
    """
118
    Class capable of writing daily OHLCV data to disk in a format that can be
119
    read efficiently by BcolzDailyOHLCVReader.
120
121
    See Also
122
    --------
123
    BcolzDailyBarReader : Consumer of the data written by this class.
124
    """
125
126
    @abstractmethod
127
    def gen_tables(self, assets):
128
        """
129
        Return an iterator of pairs of (asset_id, bcolz.ctable).
130
        """
131
        raise NotImplementedError()
132
133
    @abstractmethod
134
    def to_uint32(self, array, colname):
135
        """
136
        Convert raw column values produced by gen_tables into uint32 values.
137
138
        Parameters
139
        ----------
140
        array : np.array
141
            An array of raw values.
142
        colname : str, {'open', 'high', 'low', 'close', 'volume', 'day'}
143
            The name of the column being loaded.
144
145
        For output being read by the default BcolzOHLCVReader, data should be
146
        stored in the following manner:
147
148
        - Pricing columns (Open, High, Low, Close) should be stored as 1000 *
149
          as-traded dollar value.
150
        - Volume should be the as-traded volume.
151
        - Dates should be stored as seconds since midnight UTC, Jan 1, 1970.
152
        """
153
        raise NotImplementedError()
154
155
    def write(self, filename, calendar, assets, show_progress=False):
156
        """
157
        Parameters
158
        ----------
159
        filename : str
160
            The location at which we should write our output.
161
        calendar : pandas.DatetimeIndex
162
            Calendar to use to compute asset calendar offsets.
163
        assets : pandas.Int64Index
164
            The assets for which to write data.
165
        show_progress : bool
166
            Whether or not to show a progress bar while writing.
167
168
        Returns
169
        -------
170
        table : bcolz.ctable
171
            The newly-written table.
172
        """
173
        _iterator = self.gen_tables(assets)
174
        if show_progress:
175
            pbar = progressbar(
176
                _iterator,
177
                length=len(assets),
178
                item_show_func=lambda i: i if i is None else str(i[0]),
179
                label="Merging asset files:",
180
            )
181
            with pbar as pbar_iterator:
182
                return self._write_internal(filename, calendar, pbar_iterator)
183
        return self._write_internal(filename, calendar, _iterator)
184
185
    def _write_internal(self, filename, calendar, iterator):
186
        """
187
        Internal implementation of write.
188
189
        `iterator` should be an iterator yielding pairs of (asset, ctable).
190
        """
191
        total_rows = 0
192
        first_row = {}
193
        last_row = {}
194
        calendar_offset = {}
195
196
        # Maps column name -> output carray.
197
        columns = {
198
            k: carray(array([], dtype=uint32))
199
            for k in US_EQUITY_PRICING_BCOLZ_COLUMNS
200
        }
201
202
        for asset_id, table in iterator:
203
            nrows = len(table)
204
            for column_name in columns:
205
                if column_name == 'id':
206
                    # We know what the content of this column is, so don't
207
                    # bother reading it.
208
                    columns['id'].append(full((nrows,), asset_id))
209
                    continue
210
                columns[column_name].append(
211
                    self.to_uint32(table[column_name][:], column_name)
212
                )
213
214
            # Bcolz doesn't support ints as keys in `attrs`, so convert
215
            # assets to strings for use as attr keys.
216
            asset_key = str(asset_id)
217
218
            # Calculate the index into the array of the first and last row
219
            # for this asset. This allows us to efficiently load single
220
            # assets when querying the data back out of the table.
221
            first_row[asset_key] = total_rows
222
            last_row[asset_key] = total_rows + nrows - 1
223
            total_rows += nrows
224
225
            # Calculate the number of trading days between the first date
226
            # in the stored data and the first date of **this** asset. This
227
            # offset used for output alignment by the reader.
228
229
            # HACK: Index with a list so that we get back an array we can pass
230
            # to self.to_uint32.  We could try to extract this in the loop
231
            # above, but that makes the logic a lot messier.
232
            asset_first_day = self.to_uint32(table['day'][[0]], 'day')[0]
233
            calendar_offset[asset_key] = calendar.get_loc(
234
                Timestamp(asset_first_day, unit='s', tz='UTC'),
235
            )
236
237
        # This writes the table to disk.
238
        full_table = ctable(
239
            columns=[
240
                columns[colname]
241
                for colname in US_EQUITY_PRICING_BCOLZ_COLUMNS
242
            ],
243
            names=US_EQUITY_PRICING_BCOLZ_COLUMNS,
244
            rootdir=filename,
245
            mode='w',
246
        )
247
        full_table.attrs['first_row'] = first_row
248
        full_table.attrs['last_row'] = last_row
249
        full_table.attrs['calendar_offset'] = calendar_offset
250
        full_table.attrs['calendar'] = calendar.asi8.tolist()
251
        return full_table
252
253
254
class DailyBarWriterFromCSVs(BcolzDailyBarWriter):
255
    """
256
    BcolzDailyBarWriter constructed from a map from csvs to assets.
257
258
    Parameters
259
    ----------
260
    asset_map : dict
261
        A map from asset_id -> path to csv with data for that asset.
262
263
    CSVs should have the following columns:
264
        day : datetime64
265
        open : float64
266
        high : float64
267
        low : float64
268
        close : float64
269
        volume : int64
270
    """
271
    _csv_dtypes = {
272
        'open': float64,
273
        'high': float64,
274
        'low': float64,
275
        'close': float64,
276
        'volume': float64,
277
    }
278
279
    def __init__(self, asset_map):
280
        self._asset_map = asset_map
281
282
    def gen_tables(self, assets):
283
        """
284
        Read CSVs as DataFrames from our asset map.
285
        """
286
        dtypes = self._csv_dtypes
287
        for asset in assets:
288
            path = self._asset_map.get(asset)
289
            if path is None:
290
                raise KeyError("No path supplied for asset %s" % asset)
291
            data = read_csv(path, parse_dates=['day'], dtype=dtypes)
292
            yield asset, ctable.fromdataframe(data)
293
294
    def to_uint32(self, array, colname):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.

Duplicated code is one of the most pungent code smells. If you need to duplicate the same code in three or more different places, we strongly encourage you to look into extracting the code into a single class or operation.

You can also find more detailed suggestions in the “Code” section of your repository.

Loading history...
295
        arrmax = array.max()
296
        if colname in OHLC:
297
            self.check_uint_safe(arrmax * 1000, colname)
298
            return (array * 1000).astype(uint32)
299
        elif colname == 'volume':
300
            self.check_uint_safe(arrmax, colname)
301
            return array.astype(uint32)
302
        elif colname == 'day':
303
            nanos_per_second = (1000 * 1000 * 1000)
304
            self.check_uint_safe(arrmax.view(int) / nanos_per_second, colname)
305
            return (array.view(int) / nanos_per_second).astype(uint32)
306
307
    @staticmethod
308
    def check_uint_safe(value, colname):
309
        if value >= UINT32_MAX:
310
            raise ValueError(
311
                "Value %s from column '%s' is too large" % (value, colname)
312
            )
313
314
315
class BcolzDailyBarReader(object):
316
    """
317
    Reader for raw pricing data written by BcolzDailyOHLCVWriter.
318
319
    A Bcolz CTable is comprised of Columns and Attributes.
320
321
    Columns
322
    -------
323
    The table with which this loader interacts contains the following columns:
324
325
    ['open', 'high', 'low', 'close', 'volume', 'day', 'id'].
326
327
    The data in these columns is interpreted as follows:
328
329
    - Price columns ('open', 'high', 'low', 'close') are interpreted as 1000 *
330
      as-traded dollar value.
331
    - Volume is interpreted as as-traded volume.
332
    - Day is interpreted as seconds since midnight UTC, Jan 1, 1970.
333
    - Id is the asset id of the row.
334
335
    The data in each column is grouped by asset and then sorted by day within
336
    each asset block.
337
338
    The table is built to represent a long time range of data, e.g. ten years
339
    of equity data, so the lengths of each asset block is not equal to each
340
    other. The blocks are clipped to the known start and end date of each asset
341
    to cut down on the number of empty values that would need to be included to
342
    make a regular/cubic dataset.
343
344
    When read across the open, high, low, close, and volume with the same
345
    index should represent the same asset and day.
346
347
    Attributes
348
    ----------
349
    The table with which this loader interacts contains the following
350
    attributes:
351
352
    first_row : dict
353
        Map from asset_id -> index of first row in the dataset with that id.
354
    last_row : dict
355
        Map from asset_id -> index of last row in the dataset with that id.
356
    calendar_offset : dict
357
        Map from asset_id -> calendar index of first row.
358
    calendar : list[int64]
359
        Calendar used to compute offsets, in asi8 format (ns since EPOCH).
360
361
    We use first_row and last_row together to quickly find ranges of rows to
362
    load when reading an asset's data into memory.
363
364
    We use calendar_offset and calendar to orient loaded blocks within a
365
    range of queried dates.
366
    """
367
    def __init__(self, table):
368
        if isinstance(table, string_types):
369
            table = ctable(rootdir=table, mode='r')
370
371
        self._table = table
372
        self._calendar = DatetimeIndex(table.attrs['calendar'], tz='UTC')
373
        self._first_rows = {
374
            int(asset_id): start_index
375
            for asset_id, start_index in iteritems(table.attrs['first_row'])
376
        }
377
        self._last_rows = {
378
            int(asset_id): end_index
379
            for asset_id, end_index in iteritems(table.attrs['last_row'])
380
        }
381
        self._calendar_offsets = {
382
            int(id_): offset
383
            for id_, offset in iteritems(table.attrs['calendar_offset'])
384
        }
385
        # Cache of fully read np.array for the carrays in the daily bar table.
386
        # raw_array does not use the same cache, but it could.
387
        # Need to test keeping the entire array in memory for the course of a
388
        # process first.
389
        self._spot_cols = {}
390
391
        self.PRICE_ADJUSTMENT_FACTOR = 0.001
392
393
    def _compute_slices(self, start_idx, end_idx, assets):
394
        """
395
        Compute the raw row indices to load for each asset on a query for the
396
        given dates after applying a shift.
397
398
        Parameters
399
        ----------
400
        start_idx : int
401
            Index of first date for which we want data.
402
        end_idx : int
403
            Index of last date for which we want data.
404
        assets : pandas.Int64Index
405
            Assets for which we want to compute row indices
406
407
        Returns
408
        -------
409
        A 3-tuple of (first_rows, last_rows, offsets):
410
        first_rows : np.array[intp]
411
            Array with length == len(assets) containing the index of the first
412
            row to load for each asset in `assets`.
413
        last_rows : np.array[intp]
414
            Array with length == len(assets) containing the index of the last
415
            row to load for each asset in `assets`.
416
        offset : np.array[intp]
417
            Array with length == (len(asset) containing the index in a buffer
418
            of length `dates` corresponding to the first row of each asset.
419
420
            The value of offset[i] will be 0 if asset[i] existed at the start
421
            of a query.  Otherwise, offset[i] will be equal to the number of
422
            entries in `dates` for which the asset did not yet exist.
423
        """
424
        # The core implementation of the logic here is implemented in Cython
425
        # for efficiency.
426
        return _compute_row_slices(
427
            self._first_rows,
428
            self._last_rows,
429
            self._calendar_offsets,
430
            start_idx,
431
            end_idx,
432
            assets,
433
        )
434
435
    def load_raw_arrays(self, columns, start_date, end_date, assets):
436
        # Assumes that the given dates are actually in calendar.
437
        start_idx = self._calendar.get_loc(start_date)
438
        end_idx = self._calendar.get_loc(end_date)
439
        first_rows, last_rows, offsets = self._compute_slices(
440
            start_idx,
441
            end_idx,
442
            assets,
443
        )
444
        return _read_bcolz_data(
445
            self._table,
446
            (end_idx - start_idx + 1, len(assets)),
447
            [column.name for column in columns],
448
            first_rows,
449
            last_rows,
450
            offsets,
451
        )
452
453
    def history_window(self, column, start_date, end_date, asset):
454
        start_idx = self.sid_day_index(asset, start_date)
455
        end_idx = self.sid_day_index(asset, end_date) + 1
456
        col = self._spot_col(column)
457
        window = col[start_idx:end_idx]
458
        if column != 'volume':
459
            window = window.astype(float64) * self.PRICE_ADJUSTMENT_FACTOR
460
        return window
461
462
    def _spot_col(self, colname):
463
        """
464
        Get the colname from daily_bar_table and read all of it into memory,
465
        caching the result.
466
467
        Parameters
468
        ----------
469
        colname : string
470
            A name of a OHLCV carray in the daily_bar_table
471
472
        Returns
473
        -------
474
        array (uint32)
475
            Full read array of the carray in the daily_bar_table with the
476
            given colname.
477
        """
478
        try:
479
            col = self._spot_cols[colname]
480
        except KeyError:
481
            col = self._spot_cols[colname] = self._table[colname]
482
        return col
483
484
    def sid_day_index(self, sid, day):
485
        """
486
        Parameters
487
        ----------
488
        sid : int
489
            The asset identifier.
490
        day : datetime64-like
491
            Midnight of the day for which data is requested.
492
493
        Returns
494
        -------
495
        int
496
            Index into the data tape for the given sid and day.
497
            Raises a NoDataOnDate exception if the given day and sid is before
498
            or after the date range of the equity.
499
        """
500
        try:
501
            day_loc = self._calendar.get_loc(day)
502
        except KeyError:
503
            raise NoDataOnDate("day={0} is outside of calendar={1}".format(
504
                day, self._calendar))
505
        offset = day_loc - self._calendar_offsets[sid]
506
        if offset < 0:
507
            raise NoDataOnDate(
508
                "No data on or before day={0} for sid={1}".format(
509
                    day, sid))
510
        ix = self._first_rows[sid] + offset
511
        if ix > self._last_rows[sid]:
512
            raise NoDataOnDate(
513
                "No data on or after day={0} for sid={1}".format(
514
                    day, sid))
515
        return ix
516
517
    def spot_price(self, sid, day, colname):
518
        """
519
        Parameters
520
        ----------
521
        sid : int
522
            The asset identifier.
523
        day : datetime64-like
524
            Midnight of the day for which data is requested.
525
        colname : string
526
            The price field. e.g. ('open', 'high', 'low', 'close', 'volume')
527
528
        Returns
529
        -------
530
        float
531
            The spot price for colname of the given sid on the given day.
532
            Raises a NoDataOnDate exception if the given day and sid is before
533
            or after the date range of the equity.
534
            Returns -1 if the day is within the date range, but the price is
535
            0.
536
        """
537
        ix = self.sid_day_index(sid, day)
538
        price = self._spot_col(colname)[ix]
539
        if price == 0:
540
            return -1
541
        if colname != 'volume':
542
            return price * 0.001
543
        else:
544
            return price
545
546
547
class SQLiteAdjustmentWriter(object):
548
    """
549
    Writer for data to be read by SQLiteAdjustmentReader
550
551
    Parameters
552
    ----------
553
    conn_or_path : str or sqlite3.Connection
554
        A handle to the target sqlite database.
555
    overwrite : bool, optional, default=False
556
        If True and conn_or_path is a string, remove any existing files at the
557
        given path before connecting.
558
559
    See Also
560
    --------
561
    SQLiteAdjustmentReader
562
    """
563
564
    def __init__(self, conn_or_path, calendar, daily_bar_reader,
565
                 overwrite=False):
566
        if isinstance(conn_or_path, sqlite3.Connection):
567
            self.conn = conn_or_path
568
        elif isinstance(conn_or_path, str):
569
            if overwrite and exists(conn_or_path):
570
                try:
571
                    remove(conn_or_path)
572
                except OSError as e:
573
                    if e.errno != ENOENT:
574
                        raise
575
            self.conn = sqlite3.connect(conn_or_path)
576
        else:
577
            raise TypeError("Unknown connection type %s" % type(conn_or_path))
578
579
        self._daily_bar_reader = daily_bar_reader
580
        self._calendar = calendar
581
582
    def write_frame(self, tablename, frame):
583
        if frozenset(frame.columns) != SQLITE_ADJUSTMENT_COLUMNS:
584
            raise ValueError(
585
                "Unexpected frame columns:\n"
586
                "Expected Columns: %s\n"
587
                "Received Columns: %s" % (
588
                    SQLITE_ADJUSTMENT_COLUMNS,
589
                    frame.columns.tolist(),
590
                )
591
            )
592
        elif tablename not in SQLITE_ADJUSTMENT_TABLENAMES:
593
            raise ValueError(
594
                "Adjustment table %s not in %s" % (
595
                    tablename, SQLITE_ADJUSTMENT_TABLENAMES
596
                )
597
            )
598
599
        expected_dtypes = SQLITE_ADJUSTMENT_COLUMN_DTYPES
600
        actual_dtypes = frame.dtypes
601
        for colname, expected in iteritems(expected_dtypes):
602
            actual = actual_dtypes[colname]
603
            if not issubdtype(actual, expected):
604
                raise TypeError(
605
                    "Expected data of type {expected} for column '{colname}', "
606
                    "but got {actual}.".format(
607
                        expected=expected,
608
                        colname=colname,
609
                        actual=actual,
610
                    )
611
                )
612
        return frame.to_sql(tablename, self.conn)
613
614
    def write_dividend_payouts(self, frame):
615
        """
616
        Write dividend payout data to SQLite table `dividend_payouts`.
617
        """
618
        if frozenset(frame.columns) != SQLITE_DIVIDEND_PAYOUT_COLUMNS:
619
            raise ValueError(
620
                "Unexpected frame columns:\n"
621
                "Expected Columns: %s\n"
622
                "Received Columns: %s" % (
623
                    sorted(SQLITE_DIVIDEND_PAYOUT_COLUMNS),
624
                    sorted(frame.columns.tolist()),
625
                )
626
            )
627
628
        expected_dtypes = SQLITE_DIVIDEND_PAYOUT_COLUMN_DTYPES
629
        actual_dtypes = frame.dtypes
630
        for colname, expected in iteritems(expected_dtypes):
631
            actual = actual_dtypes[colname]
632
            if not issubdtype(actual, expected):
633
                raise TypeError(
634
                    "Expected data of type {expected} for column '{colname}', "
635
                    "but got {actual}.".format(
636
                        expected=expected,
637
                        colname=colname,
638
                        actual=actual,
639
                    )
640
                )
641
        return frame.to_sql('dividend_payouts', self.conn)
642
643
    def write_stock_dividend_payouts(self, frame):
644
        if frozenset(frame.columns) != SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMNS:
645
            raise ValueError(
646
                "Unexpected frame columns:\n"
647
                "Expected Columns: %s\n"
648
                "Received Columns: %s" % (
649
                    sorted(SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMNS),
650
                    sorted(frame.columns.tolist()),
651
                )
652
            )
653
654
        expected_dtypes = SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMN_DTYPES
655
        actual_dtypes = frame.dtypes
656
        for colname, expected in iteritems(expected_dtypes):
657
            actual = actual_dtypes[colname]
658
            if not issubdtype(actual, expected):
659
                raise TypeError(
660
                    "Expected data of type {expected} for column '{colname}', "
661
                    "but got {actual}.".format(
662
                        expected=expected,
663
                        colname=colname,
664
                        actual=actual,
665
                    )
666
                )
667
        return frame.to_sql('stock_dividend_payouts', self.conn)
668
669
    def calc_dividend_ratios(self, dividends):
670
        """
671
        Calculate the ratios to apply to equities when looking back at pricing
672
        history so that the price is smoothed over the ex_date, when the market
673
        adjusts to the change in equity value due to upcoming dividend.
674
675
        Returns
676
        -------
677
        DataFrame
678
            A frame in the same format as splits and mergers, with keys
679
            - sid, the id of the equity
680
            - effective_date, the date in seconds on which to apply the ratio.
681
            - ratio, the ratio to apply to backwards looking pricing data.
682
        """
683
        ex_dates = dividends.ex_date.values
684
685
        sids = dividends.sid.values
686
        amounts = dividends.amount.values
687
688
        ratios = full(len(amounts), nan)
689
690
        daily_bar_reader = self._daily_bar_reader
691
692
        calendar = self._calendar
693
694
        effective_dates = full(len(amounts), -1, dtype=int64)
695
696
        for i, amount in enumerate(amounts):
697
            sid = sids[i]
698
            ex_date = ex_dates[i]
699
            day_loc = calendar.get_loc(ex_date)
700
            prev_close_date = calendar[day_loc - 1]
701
            try:
702
                prev_close = daily_bar_reader.spot_price(
703
                    sid, prev_close_date, 'close')
704
                if prev_close != 0.0:
705
                    ratio = 1.0 - amount / prev_close
706
                    ratios[i] = ratio
707
                    # only assign effective_date when data is found
708
                    effective_dates[i] = ex_date
709
            except NoDataOnDate:
710
                logger.warn("Couldn't compute ratio for dividend %s" % {
711
                    'sid': sid,
712
                    'ex_date': ex_date,
713
                    'amount': amount,
714
                })
715
                continue
716
717
        # Create a mask to filter out indices in the effective_date, sid, and
718
        # ratio vectors for which a ratio was not calculable.
719
        effective_mask = effective_dates != -1
720
        effective_dates = effective_dates[effective_mask]
721
        effective_dates = effective_dates.astype('datetime64[ns]').\
722
            astype('datetime64[s]').astype(uint32)
723
        sids = sids[effective_mask]
724
        ratios = ratios[effective_mask]
725
726
        return DataFrame({
727
            'sid': sids,
728
            'effective_date': effective_dates,
729
            'ratio': ratios,
730
        })
731
732
    def write_dividend_data(self, dividends, stock_dividends=None):
733
        """
734
        Write both dividend payouts and the derived price adjustment ratios.
735
        """
736
737
        # First write the dividend payouts.
738
        dividend_payouts = dividends.copy()
739
        dividend_payouts['ex_date'] = dividend_payouts['ex_date'].values.\
740
            astype('datetime64[s]').astype(integer)
741
        dividend_payouts['record_date'] = \
742
            dividend_payouts['record_date'].values.astype('datetime64[s]').\
743
            astype(integer)
744
        dividend_payouts['declared_date'] = \
745
            dividend_payouts['declared_date'].values.astype('datetime64[s]').\
746
            astype(integer)
747
        dividend_payouts['pay_date'] = \
748
            dividend_payouts['pay_date'].values.astype('datetime64[s]').\
749
            astype(integer)
750
751
        self.write_dividend_payouts(dividend_payouts)
752
753
        if stock_dividends is not None:
754
            stock_dividend_payouts = stock_dividends.copy()
755
            stock_dividend_payouts['ex_date'] = \
756
                stock_dividend_payouts['ex_date'].values.\
757
                astype('datetime64[s]').astype(integer)
758
            stock_dividend_payouts['record_date'] = \
759
                stock_dividend_payouts['record_date'].values.\
760
                astype('datetime64[s]').astype(integer)
761
            stock_dividend_payouts['declared_date'] = \
762
                stock_dividend_payouts['declared_date'].\
763
                values.astype('datetime64[s]').astype(integer)
764
            stock_dividend_payouts['pay_date'] = \
765
                stock_dividend_payouts['pay_date'].\
766
                values.astype('datetime64[s]').astype(integer)
767
        else:
768
            stock_dividend_payouts = DataFrame({
769
                'sid': array([], dtype=uint32),
770
                'record_date': array([], dtype=uint32),
771
                'ex_date': array([], dtype=uint32),
772
                'declared_date': array([], dtype=uint32),
773
                'pay_date': array([], dtype=uint32),
774
                'payment_sid': array([], dtype=uint32),
775
                'ratio': array([], dtype=float),
776
            })
777
778
        self.write_stock_dividend_payouts(stock_dividend_payouts)
779
780
        # Second from the dividend payouts, calculate ratios.
781
782
        dividend_ratios = self.calc_dividend_ratios(dividends)
783
784
        self.write_frame('dividends', dividend_ratios)
785
786
    def write(self, splits, mergers, dividends, stock_dividends=None):
787
        """
788
        Writes data to a SQLite file to be read by SQLiteAdjustmentReader.
789
790
        Parameters
791
        ----------
792
        splits : pandas.DataFrame
793
            Dataframe containing split data.
794
        mergers : pandas.DataFrame
795
            DataFrame containing merger data.
796
        dividends : pandas.DataFrame
797
            DataFrame containing dividend data.
798
799
        Notes
800
        -----
801
        DataFrame input (`splits`, `mergers`) should all have
802
        the following columns:
803
804
        effective_date : int
805
            The date, represented as seconds since Unix epoch, on which the
806
            adjustment should be applied.
807
        ratio : float
808
            A value to apply to all data earlier than the effective date.
809
        sid : int
810
            The asset id associated with this adjustment.
811
812
        The ratio column is interpreted as follows:
813
        - For all adjustment types, multiply price fields ('open', 'high',
814
          'low', and 'close') by the ratio.
815
        - For **splits only**, **divide** volume by the adjustment ratio.
816
817
        DataFrame input, 'dividends' should have the following columns:
818
819
        sid : int
820
            The asset id associated with this adjustment.
821
        ex_date : datetime64
822
            The date on which an equity must be held to be eligible to receive
823
            payment.
824
        declared_date : datetime64
825
            The date on which the dividend is announced to the public.
826
        pay_date : datetime64
827
            The date on which the dividend is distributed.
828
        record_date : datetime64
829
            The date on which the stock ownership is checked to determine
830
            distribution of dividends.
831
        amount : float
832
            The cash amount paid for each share.
833
834
        Dividend ratios are calculated as
835
        1.0 - (dividend_value / "close on day prior to dividend ex_date").
836
837
838
        DataFrame input, 'stock_dividends' should have the following columns:
839
840
        sid : int
841
            The asset id associated with this adjustment.
842
        ex_date : datetime64
843
            The date on which an equity must be held to be eligible to receive
844
            payment.
845
        declared_date : datetime64
846
            The date on which the dividend is announced to the public.
847
        pay_date : datetime64
848
            The date on which the dividend is distributed.
849
        record_date : datetime64
850
            The date on which the stock ownership is checked to determine
851
            distribution of dividends.
852
        payment_sid : int
853
            The asset id of the shares that should be paid instead of cash.
854
        ratio: float
855
            The ratio of currently held shares in the held sid that should
856
            be paid with new shares of the payment_sid.
857
858
        stock_dividends is optional.
859
860
861
        Returns
862
        -------
863
        None
864
865
        See Also
866
        --------
867
        SQLiteAdjustmentReader : Consumer for the data written by this class
868
        """
869
        self.write_frame('splits', splits)
870
        self.write_frame('mergers', mergers)
871
        self.write_dividend_data(dividends, stock_dividends)
872
        self.conn.execute(
873
            "CREATE INDEX splits_sids "
874
            "ON splits(sid)"
875
        )
876
        self.conn.execute(
877
            "CREATE INDEX splits_effective_date "
878
            "ON splits(effective_date)"
879
        )
880
        self.conn.execute(
881
            "CREATE INDEX mergers_sids "
882
            "ON mergers(sid)"
883
        )
884
        self.conn.execute(
885
            "CREATE INDEX mergers_effective_date "
886
            "ON mergers(effective_date)"
887
        )
888
        self.conn.execute(
889
            "CREATE INDEX dividends_sid "
890
            "ON dividends(sid)"
891
        )
892
        self.conn.execute(
893
            "CREATE INDEX dividends_effective_date "
894
            "ON dividends(effective_date)"
895
        )
896
        self.conn.execute(
897
            "CREATE INDEX dividend_payouts_sid "
898
            "ON dividend_payouts(sid)"
899
        )
900
        self.conn.execute(
901
            "CREATE INDEX dividends_payouts_ex_date "
902
            "ON dividend_payouts(ex_date)"
903
        )
904
        self.conn.execute(
905
            "CREATE INDEX stock_dividend_payouts_sid "
906
            "ON stock_dividend_payouts(sid)"
907
        )
908
        self.conn.execute(
909
            "CREATE INDEX stock_dividends_payouts_ex_date "
910
            "ON stock_dividend_payouts(ex_date)"
911
        )
912
913
    def close(self):
914
        self.conn.close()
915
916
917
UNPAID_QUERY_TEMPLATE = """
918
SELECT sid, amount, pay_date from dividend_payouts
919
WHERE ex_date=? AND sid IN ({0})
920
"""
921
922
Dividend = namedtuple('Dividend', ['sid', 'amount', 'pay_date'])
923
924
UNPAID_STOCK_DIVIDEND_QUERY_TEMPLATE = """
925
SELECT sid, payment_sid, ratio, pay_date from stock_dividend_payouts
926
WHERE ex_date=? AND sid IN ({0})
927
"""
928
929
StockDividend = namedtuple(
930
    'StockDividend',
931
    ['sid', 'payment_sid', 'ratio', 'pay_date'])
932
933
934
class SQLiteAdjustmentReader(object):
935
    """
936
    Loads adjustments based on corporate actions from a SQLite database.
937
938
    Expects data written in the format output by `SQLiteAdjustmentWriter`.
939
940
    Parameters
941
    ----------
942
    conn : str or sqlite3.Connection
943
        Connection from which to load data.
944
    """
945
946
    def __init__(self, conn):
947
        if isinstance(conn, str):
948
            conn = sqlite3.connect(conn)
949
        self.conn = conn
950
951
    def load_adjustments(self, columns, dates, assets):
952
        return load_adjustments_from_sqlite(
953
            self.conn,
954
            [column.name for column in columns],
955
            dates,
956
            assets,
957
        )
958
959
    def get_adjustments_for_sid(self, table_name, sid):
960
        t = (sid,)
961
        c = self.conn.cursor()
962
        adjustments_for_sid = c.execute(
963
            "SELECT effective_date, ratio FROM %s WHERE sid = ?" %
964
            table_name, t).fetchall()
965
        c.close()
966
967
        return [[Timestamp(adjustment[0], unit='s', tz='UTC'), adjustment[1]]
968
                for adjustment in
969
                adjustments_for_sid]
970
971
    def get_dividends_with_ex_date(self, assets, date):
972
        seconds = date.value / int(1e9)
973
        c = self.conn.cursor()
974
975
        query = UNPAID_QUERY_TEMPLATE.format(",".join(['?' for _ in assets]))
976
        t = (seconds,) + tuple(map(lambda x: int(x), assets))
977
978
        c.execute(query, t)
979
980
        rows = c.fetchall()
981
        c.close()
982
        divs = []
983
        for row in rows:
984
            div = Dividend(
985
                row[0], row[1], Timestamp(row[2], unit='s', tz='UTC'))
986
            divs.append(div)
987
        return divs
988
989
    def get_stock_dividends_with_ex_date(self, assets, date):
990
        seconds = date.value / int(1e9)
991
        c = self.conn.cursor()
992
993
        query = UNPAID_STOCK_DIVIDEND_QUERY_TEMPLATE.format(
994
            ",".join(['?' for _ in assets]))
995
        t = (seconds,) + tuple(map(lambda x: int(x), assets))
996
997
        c.execute(query, t)
998
999
        rows = c.fetchall()
1000
        c.close()
1001
1002
        stock_divs = []
1003
        for row in rows:
1004
            stock_div = StockDividend(
1005
                row[0], row[1], row[2], Timestamp(row[3], unit='s', tz='UTC'))
1006
            stock_divs.append(stock_div)
1007
        return stock_divs
1008