diff --git a/tests/pipeline/base.py b/tests/pipeline/base.py index 94acd2de..e949f5c9 100644 --- a/tests/pipeline/base.py +++ b/tests/pipeline/base.py @@ -123,6 +123,17 @@ class BaseUSEquityPipelineTestCase(WithTradingSessions, hooks=NoHooks(), ) + def run_terms(self, terms, initial_workspace, mask): + start_date, end_date = mask.index[[0, -1]] + graph = ExecutionPlan( + domain=US_EQUITIES, + terms=terms, + start_date=start_date, + end_date=end_date, + ) + + return self.run_graph(graph, initial_workspace, mask) + def check_terms(self, terms, expected, @@ -133,15 +144,8 @@ class BaseUSEquityPipelineTestCase(WithTradingSessions, Compile the given terms into a TermGraph, compute it with initial_workspace, and compare the results with ``expected``. """ - start_date, end_date = mask.index[[0, -1]] - graph = ExecutionPlan( - domain=US_EQUITIES, - terms=terms, - start_date=start_date, - end_date=end_date, - ) + results = self.run_terms(terms, initial_workspace, mask) - results = self.run_graph(graph, initial_workspace, mask) for key, (res, exp) in dzip_exact(results, expected).items(): check(res, exp) diff --git a/tests/pipeline/test_factor.py b/tests/pipeline/test_factor.py index 83ca2f3d..0b267a1e 100644 --- a/tests/pipeline/test_factor.py +++ b/tests/pipeline/test_factor.py @@ -19,6 +19,7 @@ from numpy import ( log1p, nan, ones, + ones_like, rot90, where, ) @@ -38,7 +39,10 @@ from zipline.pipeline.factors import ( Returns, PercentChange, ) -from zipline.pipeline.factors.factor import winsorize as zp_winsorize +from zipline.pipeline.factors.factor import ( + summary_funcs, + winsorize as zp_winsorize, +) from zipline.testing import ( check_allclose, check_arrays, @@ -51,9 +55,11 @@ from zipline.testing.fixtures import ( ) from zipline.testing.predicates import assert_equal from zipline.utils.numpy_utils import ( + as_column, categorical_dtype, datetime64ns_dtype, float64_dtype, + ignore_nanwarnings, int64_dtype, NaTns, ) @@ -737,7 +743,7 @@ class FactorTestCase(BaseUSEquityPipelineTestCase): [0.000, nan, 1.250, -1.250], [-0.500, 0.500, nan, 0.000], [-0.500, 0.500, 0.000, nan]] - ) + ), } # Changing the classifier dtype shouldn't affect anything. expected['grouped_str'] = expected['grouped'] @@ -1504,3 +1510,230 @@ class TestSpecialCases(WithUSEquityPricingPipelineEngine, 'daily': DailyReturns(), 'manual_daily': Returns(window_length=2), }) + + +class SummaryTestCase(BaseUSEquityPipelineTestCase, ZiplineTestCase): + + @parameter_space( + seed=[1, 2, 3], + mask=[ + np.zeros((10, 5), dtype=bool), + ones((10, 5), dtype=bool), + eye(10, 5, dtype=bool), + ~eye(10, 5, dtype=bool), + ] + ) + def test_summary_methods(self, seed, mask): + """Test that summary funcs work the same as numpy NaN-aware funcs. + """ + rand = np.random.RandomState(seed) + shape = (10, 5) + data = rand.randn(*shape) + data[~mask] = np.nan + + workspace = {F(): data} + terms = { + 'mean': F().mean(), + 'sum': F().sum(), + 'median': F().median(), + 'min': F().min(), + 'max': F().max(), + 'stddev': F().stddev(), + 'notnull_count': F().notnull_count(), + } + + with ignore_nanwarnings(): + expected = { + 'mean': as_column(np.nanmean(data, axis=1)), + 'sum': as_column(np.nansum(data, axis=1)), + 'median': as_column(np.nanmedian(data, axis=1)), + 'min': as_column(np.nanmin(data, axis=1)), + 'max': as_column(np.nanmax(data, axis=1)), + 'stddev': as_column(np.nanstd(data, axis=1)), + 'notnull_count': as_column((~np.isnan(data)).sum(axis=1)), + } + + # Make sure we have test coverage for all summary funcs. + self.assertEqual(set(expected), summary_funcs.names) + + self.check_terms( + terms=terms, + expected=expected, + initial_workspace=workspace, + mask=self.build_mask(ones(shape)), + ) + + @parameter_space( + seed=[4, 5, 6], + mask=[ + np.zeros((10, 5), dtype=bool), + ones((10, 5), dtype=bool), + eye(10, 5, dtype=bool), + ~eye(10, 5, dtype=bool), + ] + ) + def test_built_in_vs_summary(self, seed, mask): + """Test that summary funcs match normalization functions. + """ + rand = np.random.RandomState(seed) + shape = (10, 5) + data = rand.randn(*shape) + data[~mask] = np.nan + + workspace = {F(): data} + terms = { + 'demean': F().demean(), + 'alt_demean': F() - F().mean(), + + 'zscore': F().zscore(), + 'alt_zscore': (F() - F().mean()) / F().stddev(), + + 'mean': F().mean(), + 'alt_mean': F().sum() / F().notnull_count(), + } + + result = self.run_terms( + terms, + initial_workspace=workspace, + mask=self.build_mask(ones(shape)), + ) + + assert_equal(result['demean'], result['alt_demean']) + assert_equal(result['zscore'], result['alt_zscore']) + + @parameter_space( + seed=[100, 200, 300], + mask=[ + np.zeros((10, 5), dtype=bool), + ones((10, 5), dtype=bool), + eye(10, 5, dtype=bool), + ~eye(10, 5, dtype=bool), + ] + ) + def test_complex_expression(self, seed, mask): + rand = np.random.RandomState(seed) + shape = (10, 5) + data = rand.randn(*shape) + data[~mask] = np.nan + + workspace = {F(): data} + terms = { + 'rescaled': (F() - F().min()) / (F().max() - F().min()), + } + + with ignore_nanwarnings(): + mins = as_column(np.nanmin(data, axis=1)) + maxes = as_column(np.nanmax(data, axis=1)) + + expected = { + 'rescaled': (data - mins) / (maxes - mins), + } + + self.check_terms( + terms, + expected, + initial_workspace=workspace, + mask=self.build_mask(ones(shape)), + ) + + @parameter_space( + seed=[40, 41, 42], + mask=[ + np.zeros((10, 5), dtype=bool), + ones((10, 5), dtype=bool), + eye(10, 5, dtype=bool), + ~eye(10, 5, dtype=bool), + ], + # Three ways to mask: + # 1. Don't mask. + # 2. Mask by passing mask parameter to summary methods. + # 3. Mask by having non-True values in the root mask. + mask_mode=('none', 'param', 'root'), + ) + def test_summaries_after_fillna(self, seed, mask, mask_mode): + rand = np.random.RandomState(seed) + shape = (10, 5) + + # Create data with a mix of NaN and non-NaN values. + with_nans = np.where(mask, rand.randn(*shape), np.nan) + + # Create a version with NaNs filled with -1s. + with_minus_1s = np.where(mask, with_nans, -1) + + kwargs = {} + workspace = {F(): with_nans} + + # Call each summary method with mask=Mask(). + if mask_mode == 'param': + kwargs['mask'] = Mask() + workspace[Mask()] = mask + + # Take the mean after applying a fillna of -1 to ensure that we ignore + # masked locations properly. + terms = { + 'mean': F().fillna(-1).mean(**kwargs), + 'sum': F().fillna(-1).sum(**kwargs), + 'median': F().fillna(-1).median(**kwargs), + 'min': F().fillna(-1).min(**kwargs), + 'max': F().fillna(-1).max(**kwargs), + 'stddev': F().fillna(-1).stddev(**kwargs), + 'notnull_count': F().fillna(-1).notnull_count(**kwargs), + } + + with ignore_nanwarnings(): + if mask_mode == 'none': + # If we aren't masking, we should expect the results to see the + # -1s. + expected_input = with_minus_1s + else: + # If we are masking, we should expect the results to see NaNs. + expected_input = with_nans + + expected = { + 'mean': as_column(np.nanmean(expected_input, axis=1)), + 'sum': as_column(np.nansum(expected_input, axis=1)), + 'median': as_column(np.nanmedian(expected_input, axis=1)), + 'min': as_column(np.nanmin(expected_input, axis=1)), + 'max': as_column(np.nanmax(expected_input, axis=1)), + 'stddev': as_column(np.nanstd(expected_input, axis=1)), + 'notnull_count': as_column( + (~np.isnan(expected_input)).sum(axis=1), + ), + } + + # Make sure we have test coverage for all summary funcs. + self.assertEqual(set(expected), summary_funcs.names) + + if mask_mode == 'root': + root_mask = self.build_mask(mask) + else: + root_mask = self.build_mask(ones_like(mask)) + + self.check_terms( + terms=terms, + expected=expected, + initial_workspace=workspace, + mask=root_mask, + ) + + def test_repr(self): + + class MyFactor(CustomFactor): + window_length = 1 + inputs = () + + def recursive_repr(self): + return "MyFactor()" + + f = MyFactor() + + for method in summary_funcs.names: + summarized = getattr(f, method)() + self.assertEqual( + repr(summarized), + "MyFactor().{}()".format(method), + ) + self.assertEqual( + summarized.recursive_repr(), + "MyFactor().{}()".format(method), + ) diff --git a/zipline/pipeline/factors/factor.py b/zipline/pipeline/factors/factor.py index 9ff8e65f..1dcf1e3a 100644 --- a/zipline/pipeline/factors/factor.py +++ b/zipline/pipeline/factors/factor.py @@ -52,11 +52,20 @@ from zipline.pipeline.sentinels import NotSpecified, NotSpecifiedType from zipline.pipeline.term import AssetExists, ComputableTerm, Term from zipline.utils.functional import with_doc, with_name from zipline.utils.input_validation import expect_types -from zipline.utils.math_utils import nanmean, nanstd +from zipline.utils.math_utils import ( + nanmax, + nanmean, + nanmedian, + nanmin, + nanstd, + nansum, +) from zipline.utils.numpy_utils import ( + as_column, bool_dtype, coerce_to_dtype, float64_dtype, + is_missing, ) from zipline.utils.sharedoc import templated_docstring @@ -382,6 +391,73 @@ CORRELATION_METHOD_NOTE = dedent( ) +class summary_funcs(object): + """Namespace of functions meant to be used with DailySummary. + """ + + @staticmethod + def mean(a, missing_value): + return nanmean(a, axis=1) + + @staticmethod + def stddev(a, missing_value): + return nanstd(a, axis=1) + + @staticmethod + def max(a, missing_value): + return nanmax(a, axis=1) + + @staticmethod + def min(a, missing_value): + return nanmin(a, axis=1) + + @staticmethod + def median(a, missing_value): + return nanmedian(a, axis=1) + + @staticmethod + def sum(a, missing_value): + return nansum(a, axis=1) + + @staticmethod + def notnull_count(a, missing_value): + return (~is_missing(a, missing_value)).sum(axis=1) + + names = {k for k in locals() if not k.startswith('_')} + + +def summary_method(name): + func = getattr(summary_funcs, name) + + @expect_types(mask=(Filter, NotSpecifiedType)) + @float64_only + def f(self, mask=NotSpecified): + """Create a 1-dimensional factor computing the {} of self, each day. + + Parameters + ---------- + mask : zipline.pipeline.Filter, optional + A Filter representing assets to consider when computing results. + If supplied, we ignore asset/date pairs where ``mask`` produces + ``False``. + + Returns + ------- + result : zipline.pipeline.Factor + """ + return DailySummary( + func, + self, + mask=mask, + dtype=self.dtype, + ) + + f.__name__ = func.__name__ + f.__doc__ = f.__doc__.format(f.__name__) + + return f + + class Factor(RestrictedDTypeMixin, ComputableTerm): """ Pipeline API expression producing a numerical or date-valued output. @@ -447,6 +523,11 @@ class Factor(RestrictedDTypeMixin, ComputableTerm): __truediv__ = clsdict['__div__'] __rtruediv__ = clsdict['__rdiv__'] + # Add summary functions. + clsdict.update( + {name: summary_method(name) for name in summary_funcs.names}, + ) + del clsdict # don't pollute the class namespace with this. eq = binary_operator('==') @@ -1685,6 +1766,51 @@ class Latest(LatestMixin, CustomFactor): out[:] = data[-1] +class DailySummary(SingleInputMixin, Factor): + """1D Factor that computes a summary statistic across all assets. + """ + ndim = 1 + window_length = 0 + params = ('func',) + + def __new__(cls, func, input_, mask, dtype): + # TODO: We should be able to support datetime64 as well, but that + # requires extra care for handling NaT. + if dtype != float64_dtype: + raise AssertionError( + "DailySummary only supports float64 dtype, got {}" + .format(dtype), + ) + + return super(DailySummary, cls).__new__( + cls, + inputs=[input_], + dtype=dtype, + missing_value=nan, + window_safe=input_.window_safe, + func=func, + mask=mask, + ) + + def _compute(self, arrays, dates, assets, mask): + func = self.params['func'] + + data = arrays[0] + data[~mask] = nan + if not isnan(self.inputs[0].missing_value): + data[data == self.inputs[0].missing_value] = nan + + return as_column(func(data, self.inputs[0].missing_value)) + + def __repr__(self): + return "{}.{}()".format( + self.inputs[0].recursive_repr(), + self.params['func'].__name__, + ) + + graph_repr = recursive_repr = __repr__ + + # Functions to be passed to GroupedRowTransform. These aren't defined inline # because the transformation function is part of the instance hash key. def demean(row): diff --git a/zipline/utils/math_utils.py b/zipline/utils/math_utils.py index 16fdb99d..a7b2df4d 100644 --- a/zipline/utils/math_utils.py +++ b/zipline/utils/math_utils.py @@ -57,6 +57,7 @@ try: nanmin = bn.nanmin nanargmax = bn.nanargmax nanargmin = bn.nanargmin + nanmedian = bn.nanmedian except ImportError: # slower numpy import numpy as np @@ -67,6 +68,7 @@ except ImportError: nanmin = np.nanmin nanargmax = np.nanargmax nanargmin = np.nanargmin + nanmedian = np.nanmedian def round_if_near_integer(a, epsilon=1e-4):