开发者

Are aggregate extensions the best way to solve this issue or should I try to do something with grouping?

开发者 https://www.devze.com 2023-01-26 07:13 出处:网络
I a working with a stock data feed and I want to build minute bars (candlesticks) out of the data. I think that I have a pretty good query that will give me the data that I need but I am not sure that

I a working with a stock data feed and I want to build minute bars (candlesticks) out of the data. I think that I have a pretty good query that will give me the data that I need but I am not sure that it is the best way to go about it. Please give any comments or critisisms that you have, I am open to any input you want to give.

Input on the left and output on the right

class StockTick
{
    public DateTime Timestamp { get; set; }

    public int Id { get; set; }

    public decimal LastPrice { get; set; }
}

class FirstUda : CepAggregate<decimal, decimal>
{
    public override decimal GenerateOutput(IEnumerable<decimal> payloads)
    {
        return payloads.First();
    }
}

class LastUda : CepAggregate<decimal, decimal>
{
    public override decimal GenerateOutput(IEnumerable<decimal> payloads)
    {
        return payloads.Last();
    }
}

public static class CepExtensions
{
    [CepUserDefinedAggregate(typeof(FirstUda))]
    public static decimal First<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, decimal>> map)
    {
        throw CepUtility.DoNotCall();
    }

    [CepUserDefinedAggregate(typeof(LastUda))]
    public static decimal Last<InputT>(this CepWindow<InputT> window, Expression<Func<InputT, decimal>> map)
    {
        throw CepUtility.DoNotCall();
    }
}

class Program
{
    static void Main(string[] args)
    {
        using (var server = Server.Create("Default"))
        {
            var app = server.CreateApplication("app");

            var source = GetStockTick();

            var input = source.ToPointStream(
                app,
                t => PointEvent.CreateInsert(t.Timestamp, t),
                AdvanceTimeSettings.IncreasingStartTime);

            var minuteWindows = input.AlterEventDuration(e => TimeSpan.FromTicks(TimeSpan.TicksPerMinute - (e.StartTime.Ticks % TimeSpan.TicksPerMinute)));

            var highLowTicks = from e in minuteWindows
                               group e by e.Id into g
                               from win in g.SnapshotWindow(SnapshotWindowOutputPolicy.Clip)
                               select new
                               {
                                   Id = g.Key,
                                   Timestamp = win.Max(e => e.Timestamp.AddSeconds(e.Timestamp.Second * -1)),
                                   OpenPrice = win.First(e => e.LastPrice),
                                   HighPrice = win.Max(t => t.LastPrice),
                                   LowPrice = win.Min(t => t.LastPrice),
                                   ClosePrice = win.Last(e => e.LastPrice)
                               };

            foreach (var hl in highLowTicks.ToEnumerable())
            {
                Console.WriteLine(hl);
            }

            Console.ReadKey();
        }
    }

    private static IEnumerable<StockTick> GetStockTick()
    {
        var ticks = new List<StockTick>();

        var baseTime = new DateTime(2010, 1, 1, 12, 0, 0);

        ticks.Add(new StockTick() { Id = 1, Timestamp = baseTime.AddSeconds(1), LastPrice = 10 });
        ticks.Add(new StockTick() { Id = 1, Timestamp = baseTime.AddSeconds(15), LastPrice = 8 });
        ticks.Add(new StockTick() { Id = 1, Timestamp = baseTime.AddSeconds(30), LastPrice = 12 });
        ticks.Add(new StockTick() { Id = 1, Timestamp = baseTime.AddSeconds(45), LastPrice = 11 });
        ticks.Add(new StockTick() { Id = 1, Timestamp = baseTime.AddSeconds(65), LastPrice = 13 });

        ticks.Add(new StockTick() { Id = 2, Timestamp = baseTime.AddSeconds(11), LastPrice = 35 });
        ticks.Add(new StockTick() { Id = 2, Timestamp = baseTime.AddSeconds(13), LastPrice = 37 }开发者_开发知识库);
        ticks.Add(new StockTick() { Id = 2, Timestamp = baseTime.AddSeconds(50), LastPrice = 22 });
        ticks.Add(new StockTick() { Id = 2, Timestamp = baseTime.AddSeconds(55), LastPrice = 32 });
        ticks.Add(new StockTick() { Id = 2, Timestamp = baseTime.AddSeconds(61), LastPrice = 36 });

        return ticks.OrderBy(t => t.Timestamp);
    }
}


Ryan, Looks good - just checking whether this is the intended semantics:

You will get several results within each minute. For every input event, you will get a result event with the same start time, containing the first (in this minute) and the current price, as well as the min and max occurred in this minute so far. So only the very last result event within each minute will contain the first, last, min, max of the entire minute. Correct? You don't want just one event for each entire minute?

Roman

0

精彩评论

暂无评论...
验证码 换一张
取 消