ensure that all buckets are created during init. Remove all references to "name" field for attributes (shoudl come from metadata instead). Status is now an int64 (0 is passing).

This commit is contained in:
Jason Kulatunga
2021-10-24 16:01:53 -07:00
parent 9878985fa3
commit 31b5dfa038
8 changed files with 116 additions and 108 deletions
@@ -74,47 +74,20 @@ func NewScrutinyRepository(appConfig config.Interface, globalLogger logrus.Field
// we will initialize with a predetermined username & password, that you should change.
// metrics bucket will have a retention period of 8 days (since it will be down-sampled once a week)
// in hours (24hours * 8 days) = 192
// in seconds (60seconds * 60minutes * 24hours * 15 days) = 1_296_000 (see EnsureBucket() function)
onboardingResponse, err := client.Setup(
backgroundContext,
appConfig.GetString("web.influxdb.init_username"),
appConfig.GetString("web.influxdb.init_password"),
appConfig.GetString("web.influxdb.org"),
appConfig.GetString("web.influxdb.bucket"),
192)
0)
if err != nil {
return nil, err
}
appConfig.Set("web.influxdb.token", *onboardingResponse.Auth.Token)
//todo: determine if we should write the config file out here.
orgId, err := client.OrganizationsAPI().FindOrganizationByID(backgroundContext, appConfig.GetString("web.influxdb.org"))
if err != nil {
return nil, err
}
//create buckets (used for downsampling)
// metrics_weekly bucket will have a retention period of 8+1 weeks (since it will be down-sampled once a month)
// in seconds (60seconds * 60minutes * 24hours * 7 days * 9 weeks) = 5_443_200
_, err = client.BucketsAPI().CreateBucketWithName(backgroundContext, orgId, fmt.Sprintf("%s_weekly", appConfig.GetString("web.influxdb.bucket")), domain.RetentionRule{EverySeconds: 5_443_200})
if err != nil {
return nil, err
}
// metrics_monthly bucket will have a retention period of 24+1 months (since it will be down-sampled once a year)
// in seconds (60seconds * 60minutes * 24hours * 7 days * (52 + 52 + 4)weeks) = 65_318_400
_, err = client.BucketsAPI().CreateBucketWithName(backgroundContext, orgId, fmt.Sprintf("%s_monthly", appConfig.GetString("web.influxdb.bucket")), domain.RetentionRule{EverySeconds: 65_318_400})
if err != nil {
return nil, err
}
// metrics_yearly bucket will have an infinite retention period
_, err = client.BucketsAPI().CreateBucketWithName(backgroundContext, orgId, fmt.Sprintf("%s_yearly", appConfig.GetString("web.influxdb.bucket")))
if err != nil {
return nil, err
}
}
// Use blocking write client for writes to desired bucket
@@ -139,8 +112,20 @@ func NewScrutinyRepository(appConfig config.Interface, globalLogger logrus.Field
influxTaskApi: taskAPI,
gormClient: database,
}
orgInfo, err := client.OrganizationsAPI().FindOrganizationByName(backgroundContext, appConfig.GetString("web.influxdb.org"))
if err != nil {
return nil, err
}
// Initialize Buckets (if necessary)
err = deviceRepo.EnsureBuckets(backgroundContext, orgInfo)
if err != nil {
return nil, err
}
// Initialize Background Tasks
err = deviceRepo.InitTasks(backgroundContext)
err = deviceRepo.EnsureTasks(backgroundContext, *orgInfo.Id)
if err != nil {
return nil, err
}
@@ -164,32 +149,81 @@ func (sr *scrutinyRepository) Close() error {
return nil
}
func (sr *scrutinyRepository) EnsureBuckets(ctx context.Context, org *domain.Organization) error {
mainBucket := sr.appConfig.GetString("web.influxdb.bucket")
if foundMainBucket, foundErr := sr.influxClient.BucketsAPI().FindBucketByName(ctx, mainBucket); foundErr != nil {
// metrics bucket will have a retention period of (14+1) 15 days (since it will be down-sampled once a week)
// in seconds (60seconds * 60minutes * 24hours * 15 days) = 1_296_000
_, err := sr.influxClient.BucketsAPI().CreateBucketWithName(ctx, org, mainBucket, domain.RetentionRule{EverySeconds: 1_296_000})
if err != nil {
return err
}
} else {
//correctly set the retention period for the main bucket (cant do it during creation)
foundMainBucket.RetentionRules = domain.RetentionRules{domain.RetentionRule{EverySeconds: 1_296_000}}
sr.influxClient.BucketsAPI().UpdateBucket(ctx, foundMainBucket)
}
//create buckets (used for downsampling)
weeklyBucket := fmt.Sprintf("%s_weekly", sr.appConfig.GetString("web.influxdb.bucket"))
if _, foundErr := sr.influxClient.BucketsAPI().FindBucketByName(ctx, weeklyBucket); foundErr != nil {
// metrics_weekly bucket will have a retention period of 8+1 weeks (since it will be down-sampled once a month)
// in seconds (60seconds * 60minutes * 24hours * 7 days * 9 weeks) = 5_443_200
_, err := sr.influxClient.BucketsAPI().CreateBucketWithName(ctx, org, weeklyBucket, domain.RetentionRule{EverySeconds: 5_443_200})
if err != nil {
return err
}
}
monthlyBucket := fmt.Sprintf("%s_monthly", sr.appConfig.GetString("web.influxdb.bucket"))
if _, foundErr := sr.influxClient.BucketsAPI().FindBucketByName(ctx, monthlyBucket); foundErr != nil {
// metrics_monthly bucket will have a retention period of 24+1 months (since it will be down-sampled once a year)
// in seconds (60seconds * 60minutes * 24hours * 7 days * (52 + 52 + 4)weeks) = 65_318_400
_, err := sr.influxClient.BucketsAPI().CreateBucketWithName(ctx, org, monthlyBucket, domain.RetentionRule{EverySeconds: 65_318_400})
if err != nil {
return err
}
}
yearlyBucket := fmt.Sprintf("%s_yearly", sr.appConfig.GetString("web.influxdb.bucket"))
if _, foundErr := sr.influxClient.BucketsAPI().FindBucketByName(ctx, yearlyBucket); foundErr != nil {
// metrics_yearly bucket will have an infinite retention period
_, err := sr.influxClient.BucketsAPI().CreateBucketWithName(ctx, org, yearlyBucket)
if err != nil {
return err
}
}
return nil
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Tasks
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
func (sr *scrutinyRepository) InitTasks(ctx context.Context) error {
func (sr *scrutinyRepository) EnsureTasks(ctx context.Context, orgID string) error {
weeklyTaskName := "tsk-weekly-aggr"
if _, missingTask := sr.influxTaskApi.GetTaskByID(ctx, weeklyTaskName); missingTask != nil {
if found, findErr := sr.influxTaskApi.FindTasks(ctx, &api.TaskFilter{Name: weeklyTaskName}); findErr == nil && len(found) == 0 {
//weekly on Sunday at 1:00am
_, err := sr.influxTaskApi.CreateTaskWithCron(ctx, weeklyTaskName, sr.DownsampleScript("weekly"), "0 1 * * 0", sr.appConfig.GetString("web.influxdb.org"))
_, err := sr.influxTaskApi.CreateTaskWithCron(ctx, weeklyTaskName, sr.DownsampleScript("weekly"), "0 1 * * 0", orgID)
if err != nil {
return err
}
}
monthlyTaskName := "tsk-monthly-aggr"
if _, missingTask := sr.influxTaskApi.GetTaskByID(ctx, monthlyTaskName); missingTask != nil {
if found, findErr := sr.influxTaskApi.FindTasks(ctx, &api.TaskFilter{Name: monthlyTaskName}); findErr == nil && len(found) == 0 {
//monthly on first day of the month at 1:30am
_, err := sr.influxTaskApi.CreateTaskWithCron(ctx, monthlyTaskName, sr.DownsampleScript("monthly"), "30 1 1 * *", sr.appConfig.GetString("web.influxdb.org"))
_, err := sr.influxTaskApi.CreateTaskWithCron(ctx, monthlyTaskName, sr.DownsampleScript("monthly"), "30 1 1 * *", orgID)
if err != nil {
return err
}
}
yearlyTaskName := "tsk-monthly-aggr"
if _, missingTask := sr.influxTaskApi.GetTaskByID(ctx, yearlyTaskName); missingTask != nil {
yearlyTaskName := "tsk-yearly-aggr"
if found, findErr := sr.influxTaskApi.FindTasks(ctx, &api.TaskFilter{Name: yearlyTaskName}); findErr == nil && len(found) == 0 {
//yearly on the first day of the year at 2:00am
_, err := sr.influxTaskApi.CreateTaskWithCron(ctx, yearlyTaskName, sr.DownsampleScript("yearly"), "0 2 1 1 *", sr.appConfig.GetString("web.influxdb.org"))
_, err := sr.influxTaskApi.CreateTaskWithCron(ctx, yearlyTaskName, sr.DownsampleScript("yearly"), "0 2 1 1 *", orgID)
if err != nil {
return err
}
@@ -235,7 +269,7 @@ func (sr *scrutinyRepository) DownsampleScript(aggregationType string) string {
smart_data = from(bucket: sourceBucket)
|> range(start: rangeStart, stop: rangeEnd)
|> filter(fn: (r) => r["_measurement"] == "smart" )
|> filter(fn: (r) => r["_field"] !~ /(raw_string|_measurement|device_protocol|device_wwn|attribute_id|name|status|when_failed)/)
|> filter(fn: (r) => r["_field"] !~ /(raw_string|_measurement|device_protocol|device_wwn|attribute_id|when_failed)/)
|> last()
|> yield(name: "last")