adding aggregation code

This commit is contained in:
Jason Kulatunga
2021-10-24 13:07:12 -07:00
parent 975c034925
commit 9878985fa3
4 changed files with 979 additions and 18 deletions
@@ -10,6 +10,7 @@ import (
"github.com/analogj/scrutiny/webapp/backend/pkg/models/measurements"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/domain"
"github.com/sirupsen/logrus"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
@@ -39,6 +40,7 @@ import (
//}
func NewScrutinyRepository(appConfig config.Interface, globalLogger logrus.FieldLogger) (DeviceRepo, error) {
backgroundContext := context.Background()
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Gorm/SQLite setup
@@ -70,19 +72,49 @@ func NewScrutinyRepository(appConfig config.Interface, globalLogger logrus.Field
// if no token is provided, but we have a valid server, we're going to assume this is the first setup of our server.
// we will initialize with a predetermined username & password, that you should change.
// metrics bucket will have a retention period of 8 days (since it will be down-sampled once a week)
// in hours (24hours * 8 days) = 192
onboardingResponse, err := client.Setup(
context.Background(),
backgroundContext,
appConfig.GetString("web.influxdb.init_username"),
appConfig.GetString("web.influxdb.init_password"),
appConfig.GetString("web.influxdb.org"),
appConfig.GetString("web.influxdb.bucket"),
0)
192)
if err != nil {
return nil, err
}
appConfig.Set("web.influxdb.token", *onboardingResponse.Auth.Token)
//todo: determine if we should write the config file out here.
orgId, err := client.OrganizationsAPI().FindOrganizationByID(backgroundContext, appConfig.GetString("web.influxdb.org"))
if err != nil {
return nil, err
}
//create buckets (used for downsampling)
// metrics_weekly bucket will have a retention period of 8+1 weeks (since it will be down-sampled once a month)
// in seconds (60seconds * 60minutes * 24hours * 7 days * 9 weeks) = 5_443_200
_, err = client.BucketsAPI().CreateBucketWithName(backgroundContext, orgId, fmt.Sprintf("%s_weekly", appConfig.GetString("web.influxdb.bucket")), domain.RetentionRule{EverySeconds: 5_443_200})
if err != nil {
return nil, err
}
// metrics_monthly bucket will have a retention period of 24+1 months (since it will be down-sampled once a year)
// in seconds (60seconds * 60minutes * 24hours * 7 days * (52 + 52 + 4)weeks) = 65_318_400
_, err = client.BucketsAPI().CreateBucketWithName(backgroundContext, orgId, fmt.Sprintf("%s_monthly", appConfig.GetString("web.influxdb.bucket")), domain.RetentionRule{EverySeconds: 65_318_400})
if err != nil {
return nil, err
}
// metrics_yearly bucket will have an infinite retention period
_, err = client.BucketsAPI().CreateBucketWithName(backgroundContext, orgId, fmt.Sprintf("%s_yearly", appConfig.GetString("web.influxdb.bucket")))
if err != nil {
return nil, err
}
}
// Use blocking write client for writes to desired bucket
@@ -104,10 +136,14 @@ func NewScrutinyRepository(appConfig config.Interface, globalLogger logrus.Field
influxClient: client,
influxWriteApi: writeAPI,
influxQueryApi: queryAPI,
influxTaskApi: taskAPI,
influxTaskApi: taskAPI,
gormClient: database,
}
// Initialize Background Tasks
err = deviceRepo.InitTasks(backgroundContext)
if err != nil {
return nil, err
}
return &deviceRepo, nil
}
@@ -117,7 +153,7 @@ type scrutinyRepository struct {
influxWriteApi api.WriteAPIBlocking
influxQueryApi api.QueryAPI
influxTaskApi api.TasksAPI
influxTaskApi api.TasksAPI
influxClient influxdb2.Client
gormClient *gorm.DB
@@ -132,20 +168,98 @@ func (sr *scrutinyRepository) Close() error {
// Tasks
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (sr *scrutinyRepository) InitTasks(ctx context.Context) error {
flux := ""
weeklyTaskName := "tsk-weekly-aggr"
if _, missingTask := sr.influxTaskApi.GetTaskByID(ctx, weeklyTaskName); missingTask != nil {
//weekly on Sunday at 1:00am
_, err := sr.influxTaskApi.CreateTaskWithCron(ctx, weeklyTaskName, sr.DownsampleScript("weekly"), "0 1 * * 0", sr.appConfig.GetString("web.influxdb.org"))
if err != nil {
return err
}
}
//weekly on Sunday at 1:00am
sr.influxTaskApi.CreateTaskWithCron(ctx, "tsk-weekly-aggr", flux, "0 1 * * 0", sr.appConfig.GetString("web.influxdb.org"))
monthlyTaskName := "tsk-monthly-aggr"
if _, missingTask := sr.influxTaskApi.GetTaskByID(ctx, monthlyTaskName); missingTask != nil {
//monthly on first day of the month at 1:30am
_, err := sr.influxTaskApi.CreateTaskWithCron(ctx, monthlyTaskName, sr.DownsampleScript("monthly"), "30 1 1 * *", sr.appConfig.GetString("web.influxdb.org"))
if err != nil {
return err
}
}
//monthly on first day of the month at 1:30am
sr.influxTaskApi.CreateTaskWithCron(ctx, "tsk-monthly-aggr", flux, "30 1 1 * *", sr.appConfig.GetString("web.influxdb.org"))
//yearly on the frist day of the year at 2:00am
sr.influxTaskApi.CreateTaskWithCron(ctx, "tsk-yearly-aggr", flux, "0 2 1 1 *", sr.appConfig.GetString("web.influxdb.org"))
yearlyTaskName := "tsk-monthly-aggr"
if _, missingTask := sr.influxTaskApi.GetTaskByID(ctx, yearlyTaskName); missingTask != nil {
//yearly on the first day of the year at 2:00am
_, err := sr.influxTaskApi.CreateTaskWithCron(ctx, yearlyTaskName, sr.DownsampleScript("yearly"), "0 2 1 1 *", sr.appConfig.GetString("web.influxdb.org"))
if err != nil {
return err
}
}
return nil
}
func (sr *scrutinyRepository) DownsampleScript(aggregate string) (string, error){
func (sr *scrutinyRepository) DownsampleScript(aggregationType string) string {
var sourceBucket string // the source of the data
var destBucket string // the destination for the aggregated data
var rangeStart string
var rangeEnd string
var aggWindow string
switch aggregationType {
case "weekly":
sourceBucket = sr.appConfig.GetString("web.influxdb.bucket")
destBucket = fmt.Sprintf("%s_weekly", sr.appConfig.GetString("web.influxdb.bucket"))
rangeStart = "-2w"
rangeEnd = "-1w"
aggWindow = "1w"
case "monthly":
sourceBucket = fmt.Sprintf("%s_weekly", sr.appConfig.GetString("web.influxdb.bucket"))
destBucket = fmt.Sprintf("%s_monthly", sr.appConfig.GetString("web.influxdb.bucket"))
rangeStart = "-2mo"
rangeEnd = "-1mo"
aggWindow = "1mo"
case "yearly":
sourceBucket = fmt.Sprintf("%s_monthly", sr.appConfig.GetString("web.influxdb.bucket"))
destBucket = fmt.Sprintf("%s_yearly", sr.appConfig.GetString("web.influxdb.bucket"))
rangeStart = "-2y"
rangeEnd = "-1y"
aggWindow = "1y"
}
return fmt.Sprintf(`
sourceBucket = "%s"
rangeStart = %s
rangeEnd = %s
aggWindow = %s
destBucket = "%s"
destOrg = "%s"
smart_data = from(bucket: sourceBucket)
|> range(start: rangeStart, stop: rangeEnd)
|> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["_field"] !~ /(raw_string|_measurement|device_protocol|device_wwn|attribute_id|name|status|when_failed)/)
|> last()
|> yield(name: "last")
smart_data
|> aggregateWindow(fn: mean, every: aggWindow)
|> to(bucket: destBucket, org: destOrg)
temp_data = from(bucket: sourceBucket)
|> range(start: rangeStart, stop: rangeEnd)
|> filter(fn: (r) => r["_measurement"] == "temp")
|> last()
|> yield(name: "mean")
temp_data
|> aggregateWindow(fn: mean, every: aggWindow)
|> to(bucket: destBucket, org: destOrg)
`,
sourceBucket,
rangeStart,
rangeEnd,
aggWindow,
destBucket,
sr.appConfig.GetString("web.influxdb.org"),
)
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////