Completed
Pull Request — master (#875)
by Eddie
02:28
created

zipline.data.BcolzDailyBarReader.sid_day_index()   B

Complexity

Conditions 3

Size

Total Lines 28

Duplication

Lines 0
Ratio 0 %
Metric Value
cc 3
dl 0
loc 28
rs 8.8571
1
# Copyright 2015 Quantopian, Inc.
2
#
3
# Licensed under the Apache License, Version 2.0 (the "License");
4
# you may not use this file except in compliance with the License.
5
# You may obtain a copy of the License at
6
#
7
#     http://www.apache.org/licenses/LICENSE-2.0
8
#
9
# Unless required by applicable law or agreed to in writing, software
10
# distributed under the License is distributed on an "AS IS" BASIS,
11
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
# See the License for the specific language governing permissions and
13
# limitations under the License.
14
from abc import (
15
    ABCMeta,
16
    abstractmethod,
17
)
18
from errno import ENOENT
19
from os import remove
20
from os.path import exists
21
import sqlite3
22
23
from bcolz import (
24
    carray,
25
    ctable,
26
)
27
from click import progressbar
28
from numpy import (
29
    array,
30
    int64,
31
    float64,
32
    floating,
33
    full,
34
    iinfo,
35
    integer,
36
    issubdtype,
37
    nan,
38
    uint32,
39
)
40
from pandas import (
41
    DataFrame,
42
    DatetimeIndex,
43
    read_csv,
44
    Timestamp,
45
)
46
from six import (
47
    iteritems,
48
    string_types,
49
    with_metaclass,
50
)
51
52
from ._equities import _compute_row_slices, _read_bcolz_data
53
from ._adjustments import load_adjustments_from_sqlite
54
55
import logbook
56
logger = logbook.Logger('UsEquityPricing')
57
58
OHLC = frozenset(['open', 'high', 'low', 'close'])
59
US_EQUITY_PRICING_BCOLZ_COLUMNS = [
60
    'open', 'high', 'low', 'close', 'volume', 'day', 'id'
61
]
62
SQLITE_ADJUSTMENT_COLUMNS = frozenset(['effective_date', 'ratio', 'sid'])
63
SQLITE_ADJUSTMENT_COLUMN_DTYPES = {
64
    'effective_date': integer,
65
    'ratio': floating,
66
    'sid': integer,
67
}
68
SQLITE_ADJUSTMENT_TABLENAMES = frozenset(['splits', 'dividends', 'mergers'])
69
70
71
SQLITE_DIVIDEND_PAYOUT_COLUMNS = frozenset(
72
    ['sid',
73
     'ex_date',
74
     'declared_date',
75
     'pay_date',
76
     'record_date',
77
     'amount'])
78
SQLITE_DIVIDEND_PAYOUT_COLUMN_DTYPES = {
79
    'sid': integer,
80
    'ex_date': integer,
81
    'declared_date': integer,
82
    'record_date': integer,
83
    'pay_date': integer,
84
    'amount': float,
85
}
86
87
88
SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMNS = frozenset(
89
    ['sid',
90
     'ex_date',
91
     'declared_date',
92
     'record_date',
93
     'pay_date',
94
     'payment_sid',
95
     'ratio'])
96
SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMN_DTYPES = {
97
    'sid': integer,
98
    'ex_date': integer,
99
    'declared_date': integer,
100
    'record_date': integer,
101
    'pay_date': integer,
102
    'payment_sid': integer,
103
    'ratio': float,
104
}
105
UINT32_MAX = iinfo(uint32).max
106
107
108
class NoDataOnDate(Exception):
109
    """
110
    Raised when a spot price can be found for the sid and date.
111
    """
112
    pass
113
114
115
class BcolzDailyBarWriter(with_metaclass(ABCMeta)):
116
    """
117
    Class capable of writing daily OHLCV data to disk in a format that can be
118
    read efficiently by BcolzDailyOHLCVReader.
119
120
    See Also
121
    --------
122
    BcolzDailyBarReader : Consumer of the data written by this class.
123
    """
124
125
    @abstractmethod
126
    def gen_tables(self, assets):
127
        """
128
        Return an iterator of pairs of (asset_id, bcolz.ctable).
129
        """
130
        raise NotImplementedError()
131
132
    @abstractmethod
133
    def to_uint32(self, array, colname):
134
        """
135
        Convert raw column values produced by gen_tables into uint32 values.
136
137
        Parameters
138
        ----------
139
        array : np.array
140
            An array of raw values.
141
        colname : str, {'open', 'high', 'low', 'close', 'volume', 'day'}
142
            The name of the column being loaded.
143
144
        For output being read by the default BcolzOHLCVReader, data should be
145
        stored in the following manner:
146
147
        - Pricing columns (Open, High, Low, Close) should be stored as 1000 *
148
          as-traded dollar value.
149
        - Volume should be the as-traded volume.
150
        - Dates should be stored as seconds since midnight UTC, Jan 1, 1970.
151
        """
152
        raise NotImplementedError()
153
154
    def write(self, filename, calendar, assets, show_progress=False):
155
        """
156
        Parameters
157
        ----------
158
        filename : str
159
            The location at which we should write our output.
160
        calendar : pandas.DatetimeIndex
161
            Calendar to use to compute asset calendar offsets.
162
        assets : pandas.Int64Index
163
            The assets for which to write data.
164
        show_progress : bool
165
            Whether or not to show a progress bar while writing.
166
167
        Returns
168
        -------
169
        table : bcolz.ctable
170
            The newly-written table.
171
        """
172
        _iterator = self.gen_tables(assets)
173
        if show_progress:
174
            pbar = progressbar(
175
                _iterator,
176
                length=len(assets),
177
                item_show_func=lambda i: i if i is None else str(i[0]),
178
                label="Merging asset files:",
179
            )
180
            with pbar as pbar_iterator:
181
                return self._write_internal(filename, calendar, pbar_iterator)
182
        return self._write_internal(filename, calendar, _iterator)
183
184
    def _write_internal(self, filename, calendar, iterator):
185
        """
186
        Internal implementation of write.
187
188
        `iterator` should be an iterator yielding pairs of (asset, ctable).
189
        """
190
        total_rows = 0
191
        first_row = {}
192
        last_row = {}
193
        calendar_offset = {}
194
195
        # Maps column name -> output carray.
196
        columns = {
197
            k: carray(array([], dtype=uint32))
198
            for k in US_EQUITY_PRICING_BCOLZ_COLUMNS
199
        }
200
201
        for asset_id, table in iterator:
202
            nrows = len(table)
203
            for column_name in columns:
204
                if column_name == 'id':
205
                    # We know what the content of this column is, so don't
206
                    # bother reading it.
207
                    columns['id'].append(full((nrows,), asset_id))
208
                    continue
209
                columns[column_name].append(
210
                    self.to_uint32(table[column_name][:], column_name)
211
                )
212
213
            # Bcolz doesn't support ints as keys in `attrs`, so convert
214
            # assets to strings for use as attr keys.
215
            asset_key = str(asset_id)
216
217
            # Calculate the index into the array of the first and last row
218
            # for this asset. This allows us to efficiently load single
219
            # assets when querying the data back out of the table.
220
            first_row[asset_key] = total_rows
221
            last_row[asset_key] = total_rows + nrows - 1
222
            total_rows += nrows
223
224
            # Calculate the number of trading days between the first date
225
            # in the stored data and the first date of **this** asset. This
226
            # offset used for output alignment by the reader.
227
228
            # HACK: Index with a list so that we get back an array we can pass
229
            # to self.to_uint32.  We could try to extract this in the loop
230
            # above, but that makes the logic a lot messier.
231
            asset_first_day = self.to_uint32(table['day'][[0]], 'day')[0]
232
            calendar_offset[asset_key] = calendar.get_loc(
233
                Timestamp(asset_first_day, unit='s', tz='UTC'),
234
            )
235
236
        # This writes the table to disk.
237
        full_table = ctable(
238
            columns=[
239
                columns[colname]
240
                for colname in US_EQUITY_PRICING_BCOLZ_COLUMNS
241
            ],
242
            names=US_EQUITY_PRICING_BCOLZ_COLUMNS,
243
            rootdir=filename,
244
            mode='w',
245
        )
246
        full_table.attrs['first_row'] = first_row
247
        full_table.attrs['last_row'] = last_row
248
        full_table.attrs['calendar_offset'] = calendar_offset
249
        full_table.attrs['calendar'] = calendar.asi8.tolist()
250
        return full_table
251
252
253
class DailyBarWriterFromCSVs(BcolzDailyBarWriter):
254
    """
255
    BcolzDailyBarWriter constructed from a map from csvs to assets.
256
257
    Parameters
258
    ----------
259
    asset_map : dict
260
        A map from asset_id -> path to csv with data for that asset.
261
262
    CSVs should have the following columns:
263
        day : datetime64
264
        open : float64
265
        high : float64
266
        low : float64
267
        close : float64
268
        volume : int64
269
    """
270
    _csv_dtypes = {
271
        'open': float64,
272
        'high': float64,
273
        'low': float64,
274
        'close': float64,
275
        'volume': float64,
276
    }
277
278
    def __init__(self, asset_map):
279
        self._asset_map = asset_map
280
281
    def gen_tables(self, assets):
282
        """
283
        Read CSVs as DataFrames from our asset map.
284
        """
285
        dtypes = self._csv_dtypes
286
        for asset in assets:
287
            path = self._asset_map.get(asset)
288
            if path is None:
289
                raise KeyError("No path supplied for asset %s" % asset)
290
            data = read_csv(path, parse_dates=['day'], dtype=dtypes)
291
            yield asset, ctable.fromdataframe(data)
292
293
    def to_uint32(self, array, colname):
294
        arrmax = array.max()
295
        if colname in OHLC:
296
            self.check_uint_safe(arrmax * 1000, colname)
297
            return (array * 1000).astype(uint32)
298
        elif colname == 'volume':
299
            self.check_uint_safe(arrmax, colname)
300
            return array.astype(uint32)
301
        elif colname == 'day':
302
            nanos_per_second = (1000 * 1000 * 1000)
303
            self.check_uint_safe(arrmax.view(int) / nanos_per_second, colname)
304
            return (array.view(int) / nanos_per_second).astype(uint32)
305
306
    @staticmethod
307
    def check_uint_safe(value, colname):
308
        if value >= UINT32_MAX:
309
            raise ValueError(
310
                "Value %s from column '%s' is too large" % (value, colname)
311
            )
312
313
314
class BcolzDailyBarReader(object):
315
    """
316
    Reader for raw pricing data written by BcolzDailyOHLCVWriter.
317
318
    A Bcolz CTable is comprised of Columns and Attributes.
319
320
    Columns
321
    -------
322
    The table with which this loader interacts contains the following columns:
323
324
    ['open', 'high', 'low', 'close', 'volume', 'day', 'id'].
325
326
    The data in these columns is interpreted as follows:
327
328
    - Price columns ('open', 'high', 'low', 'close') are interpreted as 1000 *
329
      as-traded dollar value.
330
    - Volume is interpreted as as-traded volume.
331
    - Day is interpreted as seconds since midnight UTC, Jan 1, 1970.
332
    - Id is the asset id of the row.
333
334
    The data in each column is grouped by asset and then sorted by day within
335
    each asset block.
336
337
    The table is built to represent a long time range of data, e.g. ten years
338
    of equity data, so the lengths of each asset block is not equal to each
339
    other. The blocks are clipped to the known start and end date of each asset
340
    to cut down on the number of empty values that would need to be included to
341
    make a regular/cubic dataset.
342
343
    When read across the open, high, low, close, and volume with the same
344
    index should represent the same asset and day.
345
346
    Attributes
347
    ----------
348
    The table with which this loader interacts contains the following
349
    attributes:
350
351
    first_row : dict
352
        Map from asset_id -> index of first row in the dataset with that id.
353
    last_row : dict
354
        Map from asset_id -> index of last row in the dataset with that id.
355
    calendar_offset : dict
356
        Map from asset_id -> calendar index of first row.
357
    calendar : list[int64]
358
        Calendar used to compute offsets, in asi8 format (ns since EPOCH).
359
360
    We use first_row and last_row together to quickly find ranges of rows to
361
    load when reading an asset's data into memory.
362
363
    We use calendar_offset and calendar to orient loaded blocks within a
364
    range of queried dates.
365
    """
366
    def __init__(self, table):
367
        if isinstance(table, string_types):
368
            table = ctable(rootdir=table, mode='r')
369
370
        self._table = table
371
        self._calendar = DatetimeIndex(table.attrs['calendar'], tz='UTC')
372
        self._first_rows = {
373
            int(asset_id): start_index
374
            for asset_id, start_index in iteritems(table.attrs['first_row'])
375
        }
376
        self._last_rows = {
377
            int(asset_id): end_index
378
            for asset_id, end_index in iteritems(table.attrs['last_row'])
379
        }
380
        self._calendar_offsets = {
381
            int(id_): offset
382
            for id_, offset in iteritems(table.attrs['calendar_offset'])
383
        }
384
        # Cache of fully read np.array for the carrays in the daily bar table.
385
        # raw_array does not use the same cache, but it could.
386
        # Need to test keeping the entire array in memory for the course of a
387
        # process first.
388
        self._spot_cols = {}
389
390
    def _compute_slices(self, start_idx, end_idx, assets):
391
        """
392
        Compute the raw row indices to load for each asset on a query for the
393
        given dates after applying a shift.
394
395
        Parameters
396
        ----------
397
        start_idx : int
398
            Index of first date for which we want data.
399
        end_idx : int
400
            Index of last date for which we want data.
401
        assets : pandas.Int64Index
402
            Assets for which we want to compute row indices
403
404
        Returns
405
        -------
406
        A 3-tuple of (first_rows, last_rows, offsets):
407
        first_rows : np.array[intp]
408
            Array with length == len(assets) containing the index of the first
409
            row to load for each asset in `assets`.
410
        last_rows : np.array[intp]
411
            Array with length == len(assets) containing the index of the last
412
            row to load for each asset in `assets`.
413
        offset : np.array[intp]
414
            Array with length == (len(asset) containing the index in a buffer
415
            of length `dates` corresponding to the first row of each asset.
416
417
            The value of offset[i] will be 0 if asset[i] existed at the start
418
            of a query.  Otherwise, offset[i] will be equal to the number of
419
            entries in `dates` for which the asset did not yet exist.
420
        """
421
        # The core implementation of the logic here is implemented in Cython
422
        # for efficiency.
423
        return _compute_row_slices(
424
            self._first_rows,
425
            self._last_rows,
426
            self._calendar_offsets,
427
            start_idx,
428
            end_idx,
429
            assets,
430
        )
431
432
    def load_raw_arrays(self, columns, start_date, end_date, assets):
433
        # Assumes that the given dates are actually in calendar.
434
        start_idx = self._calendar.get_loc(start_date)
435
        end_idx = self._calendar.get_loc(end_date)
436
        first_rows, last_rows, offsets = self._compute_slices(
437
            start_idx,
438
            end_idx,
439
            assets,
440
        )
441
        return _read_bcolz_data(
442
            self._table,
443
            (end_idx - start_idx + 1, len(assets)),
444
            [column.name for column in columns],
445
            first_rows,
446
            last_rows,
447
            offsets,
448
        )
449
450
    def _spot_col(self, colname):
451
        """
452
        Get the colname from daily_bar_table and read all of it into memory,
453
        caching the result.
454
455
        Parameters
456
        ----------
457
        colname : string
458
            A name of a OHLCV carray in the daily_bar_table
459
460
        Returns
461
        -------
462
        array (uint32)
463
            Full read array of the carray in the daily_bar_table with the
464
            given colname.
465
        """
466
        try:
467
            col = self._spot_cols[colname]
468
        except KeyError:
469
            col = self._spot_cols[colname] = self._table[colname][:]
470
        return col
471
472
    def sid_day_index(self, sid, day):
473
        """
474
        Parameters
475
        ----------
476
        sid : int
477
            The asset identifier.
478
        day : datetime64-like
479
            Midnight of the day for which data is requested.
480
481
        Returns
482
        -------
483
        int
484
            Index into the data tape for the given sid and day.
485
            Raises a NoDataOnDate exception if the given day and sid is before
486
            or after the date range of the equity.
487
        """
488
        day_loc = self._calendar.get_loc(day)
489
        offset = day_loc - self._calendar_offsets[sid]
490
        if offset < 0:
491
            raise NoDataOnDate(
492
                "No data on or before day={0} for sid={1}".format(
493
                    day, sid))
494
        ix = self._first_rows[sid] + offset
495
        if ix > self._last_rows[sid]:
496
            raise NoDataOnDate(
497
                "No data on or after day={0} for sid={1}".format(
498
                    day, sid))
499
        return ix
500
501
    def spot_price(self, sid, day, colname):
502
        """
503
        Parameters
504
        ----------
505
        sid : int
506
            The asset identifier.
507
        day : datetime64-like
508
            Midnight of the day for which data is requested.
509
        colname : string
510
            The price field. e.g. ('open', 'high', 'low', 'close', 'volume')
511
512
        Returns
513
        -------
514
        float
515
            The spot price for colname of the given sid on the given day.
516
            Raises a NoDataOnDate exception if the given day and sid is before
517
            or after the date range of the equity.
518
            Returns -1 if the day is within the date range, but the price is
519
            0.
520
        """
521
        ix = self.sid_day_index(sid, day)
522
        price = self._spot_col(colname)[ix]
523
        if price == 0:
524
            return -1
525
        if colname != 'volume':
526
            return price * 0.001
527
        else:
528
            return price
529
530
531
class SQLiteAdjustmentWriter(object):
532
    """
533
    Writer for data to be read by SQLiteAdjustmentReader
534
535
    Parameters
536
    ----------
537
    conn_or_path : str or sqlite3.Connection
538
        A handle to the target sqlite database.
539
    overwrite : bool, optional, default=False
540
        If True and conn_or_path is a string, remove any existing files at the
541
        given path before connecting.
542
543
    See Also
544
    --------
545
    SQLiteAdjustmentReader
546
    """
547
548
    def __init__(self, conn_or_path, calendar, daily_bar_reader,
549
                 overwrite=False):
550
        if isinstance(conn_or_path, sqlite3.Connection):
551
            self.conn = conn_or_path
552
        elif isinstance(conn_or_path, str):
553
            if overwrite and exists(conn_or_path):
554
                try:
555
                    remove(conn_or_path)
556
                except OSError as e:
557
                    if e.errno != ENOENT:
558
                        raise
559
            self.conn = sqlite3.connect(conn_or_path)
560
        else:
561
            raise TypeError("Unknown connection type %s" % type(conn_or_path))
562
563
        self._daily_bar_reader = daily_bar_reader
564
        self._calendar = calendar
565
566
    def write_frame(self, tablename, frame):
567
        if frozenset(frame.columns) != SQLITE_ADJUSTMENT_COLUMNS:
568
            raise ValueError(
569
                "Unexpected frame columns:\n"
570
                "Expected Columns: %s\n"
571
                "Received Columns: %s" % (
572
                    SQLITE_ADJUSTMENT_COLUMNS,
573
                    frame.columns.tolist(),
574
                )
575
            )
576
        elif tablename not in SQLITE_ADJUSTMENT_TABLENAMES:
577
            raise ValueError(
578
                "Adjustment table %s not in %s" % (
579
                    tablename, SQLITE_ADJUSTMENT_TABLENAMES
580
                )
581
            )
582
583
        expected_dtypes = SQLITE_ADJUSTMENT_COLUMN_DTYPES
584
        actual_dtypes = frame.dtypes
585
        for colname, expected in iteritems(expected_dtypes):
586
            actual = actual_dtypes[colname]
587
            if not issubdtype(actual, expected):
588
                raise TypeError(
589
                    "Expected data of type {expected} for column '{colname}', "
590
                    "but got {actual}.".format(
591
                        expected=expected,
592
                        colname=colname,
593
                        actual=actual,
594
                    )
595
                )
596
        return frame.to_sql(tablename, self.conn)
597
598
    def write_dividend_payouts(self, frame):
599
        """
600
        Write dividend payout data to SQLite table `dividend_payouts`.
601
        """
602
        if frozenset(frame.columns) != SQLITE_DIVIDEND_PAYOUT_COLUMNS:
603
            raise ValueError(
604
                "Unexpected frame columns:\n"
605
                "Expected Columns: %s\n"
606
                "Received Columns: %s" % (
607
                    sorted(SQLITE_DIVIDEND_PAYOUT_COLUMNS),
608
                    sorted(frame.columns.tolist()),
609
                )
610
            )
611
612
        expected_dtypes = SQLITE_DIVIDEND_PAYOUT_COLUMN_DTYPES
613
        actual_dtypes = frame.dtypes
614
        for colname, expected in iteritems(expected_dtypes):
615
            actual = actual_dtypes[colname]
616
            if not issubdtype(actual, expected):
617
                raise TypeError(
618
                    "Expected data of type {expected} for column '{colname}', "
619
                    "but got {actual}.".format(
620
                        expected=expected,
621
                        colname=colname,
622
                        actual=actual,
623
                    )
624
                )
625
        return frame.to_sql('dividend_payouts', self.conn)
626
627
    def write_stock_dividend_payouts(self, frame):
628
        if frozenset(frame.columns) != SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMNS:
629
            raise ValueError(
630
                "Unexpected frame columns:\n"
631
                "Expected Columns: %s\n"
632
                "Received Columns: %s" % (
633
                    sorted(SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMNS),
634
                    sorted(frame.columns.tolist()),
635
                )
636
            )
637
638
        expected_dtypes = SQLITE_STOCK_DIVIDEND_PAYOUT_COLUMN_DTYPES
639
        actual_dtypes = frame.dtypes
640
        for colname, expected in iteritems(expected_dtypes):
641
            actual = actual_dtypes[colname]
642
            if not issubdtype(actual, expected):
643
                raise TypeError(
644
                    "Expected data of type {expected} for column '{colname}', "
645
                    "but got {actual}.".format(
646
                        expected=expected,
647
                        colname=colname,
648
                        actual=actual,
649
                    )
650
                )
651
        return frame.to_sql('stock_dividend_payouts', self.conn)
652
653
    def calc_dividend_ratios(self, dividends):
654
        """
655
        Calculate the ratios to apply to equities when looking back at pricing
656
        history so that the price is smoothed over the ex_date, when the market
657
        adjusts to the change in equity value due to upcoming dividend.
658
659
        Returns
660
        -------
661
        DataFrame
662
            A frame in the same format as splits and mergers, with keys
663
            - sid, the id of the equity
664
            - effective_date, the date in seconds on which to apply the ratio.
665
            - ratio, the ratio to apply to backwards looking pricing data.
666
        """
667
        ex_dates = dividends.ex_date.values
668
669
        sids = dividends.sid.values
670
        amounts = dividends.amount.values
671
672
        ratios = full(len(amounts), nan)
673
674
        daily_bar_reader = self._daily_bar_reader
675
676
        calendar = self._calendar
677
678
        effective_dates = full(len(amounts), -1, dtype=int64)
679
680
        for i, amount in enumerate(amounts):
681
            sid = sids[i]
682
            ex_date = ex_dates[i]
683
            day_loc = calendar.get_loc(ex_date)
684
            prev_close_date = calendar[day_loc - 1]
685
            try:
686
                prev_close = daily_bar_reader.spot_price(
687
                    sid, prev_close_date, 'close')
688
                if prev_close != 0.0:
689
                    ratio = 1.0 - amount / prev_close
690
                    ratios[i] = ratio
691
                    # only assign effective_date when data is found
692
                    effective_dates[i] = ex_date
693
            except NoDataOnDate:
694
                logger.warn("Couldn't compute ratio for dividend %s" % {
695
                    'sid': sid,
696
                    'ex_date': ex_date,
697
                    'amount': amount,
698
                })
699
                continue
700
701
        # Create a mask to filter out indices in the effective_date, sid, and
702
        # ratio vectors for which a ratio was not calculable.
703
        effective_mask = effective_dates != -1
704
        effective_dates = effective_dates[effective_mask]
705
        effective_dates = effective_dates.astype('datetime64[ns]').\
706
            astype('datetime64[s]').astype(uint32)
707
        sids = sids[effective_mask]
708
        ratios = ratios[effective_mask]
709
710
        return DataFrame({
711
            'sid': sids,
712
            'effective_date': effective_dates,
713
            'ratio': ratios,
714
        })
715
716
    def write_dividend_data(self, dividends, stock_dividends=None):
717
        """
718
        Write both dividend payouts and the derived price adjustment ratios.
719
        """
720
721
        # First write the dividend payouts.
722
        dividend_payouts = dividends.copy()
723
        dividend_payouts['ex_date'] = dividend_payouts['ex_date'].values.\
724
            astype('datetime64[s]').astype(integer)
725
        dividend_payouts['record_date'] = \
726
            dividend_payouts['record_date'].values.astype('datetime64[s]').\
727
            astype(integer)
728
        dividend_payouts['declared_date'] = \
729
            dividend_payouts['declared_date'].values.astype('datetime64[s]').\
730
            astype(integer)
731
        dividend_payouts['pay_date'] = \
732
            dividend_payouts['pay_date'].values.astype('datetime64[s]').\
733
            astype(integer)
734
735
        self.write_dividend_payouts(dividend_payouts)
736
737
        if stock_dividends is not None:
738
            stock_dividend_payouts = stock_dividends.copy()
739
            stock_dividend_payouts['ex_date'] = \
740
                stock_dividend_payouts['ex_date'].values.\
741
                astype('datetime64[s]').astype(integer)
742
            stock_dividend_payouts['record_date'] = \
743
                stock_dividend_payouts['record_date'].values.\
744
                astype('datetime64[s]').astype(integer)
745
            stock_dividend_payouts['declared_date'] = \
746
                stock_dividend_payouts['declared_date'].\
747
                values.astype('datetime64[s]').astype(integer)
748
            stock_dividend_payouts['pay_date'] = \
749
                stock_dividend_payouts['pay_date'].\
750
                values.astype('datetime64[s]').astype(integer)
751
        else:
752
            stock_dividend_payouts = DataFrame({
753
                'sid': array([], dtype=uint32),
754
                'record_date': array([], dtype=uint32),
755
                'ex_date': array([], dtype=uint32),
756
                'declared_date': array([], dtype=uint32),
757
                'pay_date': array([], dtype=uint32),
758
                'payment_sid': array([], dtype=uint32),
759
                'ratio': array([], dtype=float),
760
            })
761
762
        self.write_stock_dividend_payouts(stock_dividend_payouts)
763
764
        # Second from the dividend payouts, calculate ratios.
765
766
        dividend_ratios = self.calc_dividend_ratios(dividends)
767
768
        self.write_frame('dividends', dividend_ratios)
769
770
    def write(self, splits, mergers, dividends, stock_dividends=None):
771
        """
772
        Writes data to a SQLite file to be read by SQLiteAdjustmentReader.
773
774
        Parameters
775
        ----------
776
        splits : pandas.DataFrame
777
            Dataframe containing split data.
778
        mergers : pandas.DataFrame
779
            DataFrame containing merger data.
780
        dividends : pandas.DataFrame
781
            DataFrame containing dividend data.
782
783
        Notes
784
        -----
785
        DataFrame input (`splits`, `mergers`) should all have
786
        the following columns:
787
788
        effective_date : int
789
            The date, represented as seconds since Unix epoch, on which the
790
            adjustment should be applied.
791
        ratio : float
792
            A value to apply to all data earlier than the effective date.
793
        sid : int
794
            The asset id associated with this adjustment.
795
796
        The ratio column is interpreted as follows:
797
        - For all adjustment types, multiply price fields ('open', 'high',
798
          'low', and 'close') by the ratio.
799
        - For **splits only**, **divide** volume by the adjustment ratio.
800
801
        DataFrame input, 'dividends' should have the following columns:
802
803
        sid : int
804
            The asset id associated with this adjustment.
805
        ex_date : datetime64
806
            The date on which an equity must be held to be eligible to receive
807
            payment.
808
        declared_date : datetime64
809
            The date on which the dividend is announced to the public.
810
        pay_date : datetime64
811
            The date on which the dividend is distributed.
812
        record_date : datetime64
813
            The date on which the stock ownership is checked to determine
814
            distribution of dividends.
815
        amount : float
816
            The cash amount paid for each share.
817
818
        Dividend ratios are calculated as
819
        1.0 - (dividend_value / "close on day prior to dividend ex_date").
820
821
822
        DataFrame input, 'stock_dividends' should have the following columns:
823
824
        sid : int
825
            The asset id associated with this adjustment.
826
        ex_date : datetime64
827
            The date on which an equity must be held to be eligible to receive
828
            payment.
829
        declared_date : datetime64
830
            The date on which the dividend is announced to the public.
831
        pay_date : datetime64
832
            The date on which the dividend is distributed.
833
        record_date : datetime64
834
            The date on which the stock ownership is checked to determine
835
            distribution of dividends.
836
        payment_sid : int
837
            The asset id of the shares that should be paid instead of cash.
838
        ratio: float
839
            The ratio of currently held shares in the held sid that should
840
            be paid with new shares of the payment_sid.
841
842
        stock_dividends is optional.
843
844
845
        Returns
846
        -------
847
        None
848
849
        See Also
850
        --------
851
        SQLiteAdjustmentReader : Consumer for the data written by this class
852
        """
853
        self.write_frame('splits', splits)
854
        self.write_frame('mergers', mergers)
855
        self.write_dividend_data(dividends, stock_dividends)
856
        self.conn.execute(
857
            "CREATE INDEX splits_sids "
858
            "ON splits(sid)"
859
        )
860
        self.conn.execute(
861
            "CREATE INDEX splits_effective_date "
862
            "ON splits(effective_date)"
863
        )
864
        self.conn.execute(
865
            "CREATE INDEX mergers_sids "
866
            "ON mergers(sid)"
867
        )
868
        self.conn.execute(
869
            "CREATE INDEX mergers_effective_date "
870
            "ON mergers(effective_date)"
871
        )
872
        self.conn.execute(
873
            "CREATE INDEX dividends_sid "
874
            "ON dividends(sid)"
875
        )
876
        self.conn.execute(
877
            "CREATE INDEX dividends_effective_date "
878
            "ON dividends(effective_date)"
879
        )
880
        self.conn.execute(
881
            "CREATE INDEX dividend_payouts_sid "
882
            "ON dividend_payouts(sid)"
883
        )
884
        self.conn.execute(
885
            "CREATE INDEX dividends_payouts_ex_date "
886
            "ON dividend_payouts(ex_date)"
887
        )
888
        self.conn.execute(
889
            "CREATE INDEX stock_dividend_payouts_sid "
890
            "ON stock_dividend_payouts(sid)"
891
        )
892
        self.conn.execute(
893
            "CREATE INDEX stock_dividends_payouts_ex_date "
894
            "ON stock_dividend_payouts(ex_date)"
895
        )
896
897
    def close(self):
898
        self.conn.close()
899
900
901
class SQLiteAdjustmentReader(object):
902
    """
903
    Loads adjustments based on corporate actions from a SQLite database.
904
905
    Expects data written in the format output by `SQLiteAdjustmentWriter`.
906
907
    Parameters
908
    ----------
909
    conn : str or sqlite3.Connection
910
        Connection from which to load data.
911
    """
912
913
    def __init__(self, conn):
914
        if isinstance(conn, str):
915
            conn = sqlite3.connect(conn)
916
        self.conn = conn
917
918
    def load_adjustments(self, columns, dates, assets):
919
        return load_adjustments_from_sqlite(
920
            self.conn,
921
            [column.name for column in columns],
922
            dates,
923
            assets,
924
        )
925