You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
416 lines
12 KiB
416 lines
12 KiB
package main
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
|
|
"github.com/gofrs/uuid"
|
|
"google.golang.org/grpc/metadata"
|
|
"gorm.io/gorm"
|
|
"whiteboxsystems.nl/openkvpoc/openkv"
|
|
"whiteboxsystems.nl/openkvpoc/sharedmodel"
|
|
"whiteboxsystems.nl/openkvpoc/whiteboxservice/model"
|
|
)
|
|
|
|
var errNotAuthorized = fmt.Errorf("Not Authorized")
|
|
var errInvalidService = fmt.Errorf("Invalid service")
|
|
var errActiveServiceConfig = fmt.Errorf("Service not activated")
|
|
|
|
type OpenKVServer struct {
|
|
openkv.UnimplementedOpenKVServer
|
|
data *gorm.DB
|
|
}
|
|
|
|
func (srv *OpenKVServer) LoadData(location string) error {
|
|
var err error
|
|
srv.data, err = model.GetDB(location)
|
|
return err
|
|
}
|
|
|
|
func requireConnection(db *gorm.DB, ctx context.Context) (*sharedmodel.Connection, error) {
|
|
md, ok := metadata.FromIncomingContext(ctx)
|
|
if !ok {
|
|
log.Printf("No metadata")
|
|
return nil, errNotAuthorized
|
|
}
|
|
|
|
connection := &sharedmodel.Connection{}
|
|
|
|
if a, ok := md["authorization"]; !ok {
|
|
log.Printf("No token provided")
|
|
return nil, errNotAuthorized
|
|
} else {
|
|
if err := db.Preload("AuthMethod").Raw(`
|
|
SELECT conn.*
|
|
FROM connections conn
|
|
JOIN auth_configs a on conn.auth_config_id = a.id WHERE a.method = ? and a.raw = ?
|
|
`, openkv.AuthMethod_APIToken, a[0]).Scan(connection).Error; err != nil {
|
|
log.Printf("Invalid token; err: %v;", err)
|
|
return nil, errNotAuthorized
|
|
}
|
|
}
|
|
return connection, nil
|
|
}
|
|
|
|
func requireService(db *gorm.DB, conn *sharedmodel.Connection, serviceID string) (*sharedmodel.ServiceConfig, error) {
|
|
service := &sharedmodel.Service{}
|
|
if err := db.Where("service_id = ?", serviceID).First(service).Error; err != nil {
|
|
return nil, errInvalidService
|
|
}
|
|
|
|
srvConfig := &sharedmodel.ServiceConfig{}
|
|
|
|
if err := db.Where("connection_id = ? and enabled = ? and service_id = ?", conn.ID, true, service.ID).First(srvConfig).Error; err != nil {
|
|
return nil, errActiveServiceConfig
|
|
}
|
|
|
|
return srvConfig, nil
|
|
}
|
|
|
|
func (srv *OpenKVServer) GetMetadata(
|
|
ctx context.Context, in *openkv.GetMetadataRequest,
|
|
) (*openkv.GetMetadataResponse, error) {
|
|
log.Printf("Got metadata request")
|
|
|
|
services := []*openkv.ServiceDefinition{}
|
|
presentServices := []sharedmodel.Service{}
|
|
srv.data.Find(&presentServices)
|
|
|
|
for _, service := range presentServices {
|
|
services = append(services, &openkv.ServiceDefinition{
|
|
Name: service.Name,
|
|
Description: service.Description,
|
|
Id: service.ServiceID,
|
|
SubscriptionPolicy: service.SubscriptionPolicy,
|
|
ConsentPolicy: service.ConsentPolicy,
|
|
FetchProtocols: service.GetFetchProtocols(),
|
|
PushProtocols: service.GetPushProtocols(),
|
|
})
|
|
}
|
|
|
|
resp := &openkv.GetMetadataResponse{
|
|
Supplier: "Whitebox Systems",
|
|
System: "Whitebox",
|
|
Services: services,
|
|
Success: true,
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (srv *OpenKVServer) Register(
|
|
ctx context.Context, in *openkv.RegisterRequest,
|
|
) (*openkv.RegisterResponse, error) {
|
|
ref, _ := uuid.NewV4()
|
|
psk, _ := uuid.NewV4()
|
|
|
|
reg := &sharedmodel.Registration{
|
|
Reference: ref.String(),
|
|
OrganisationId: in.OrganisationId,
|
|
OrganisationIdSystem: in.OrganisationIdSystem,
|
|
OrganisationDisplayName: in.OrganisationDisplayName,
|
|
PSK: psk.String()[0:6],
|
|
Status: sharedmodel.RegistrationStatusPending,
|
|
}
|
|
|
|
reg.SetAuthConfig(in.Auth)
|
|
|
|
srv.data.Create(reg)
|
|
|
|
resp := &openkv.RegisterResponse{
|
|
Reference: ref.String(),
|
|
Success: true,
|
|
}
|
|
|
|
log.Printf("Got registration request from %v; ref: %v; PSK: %v", reg.OrganisationDisplayName, reg.Reference, reg.PSK)
|
|
return resp, nil
|
|
}
|
|
|
|
func (srv *OpenKVServer) CompleteRegistration(
|
|
ctx context.Context, in *openkv.CompleteRegistrationRequest,
|
|
) (*openkv.CompleteRegistrationResponse, error) {
|
|
registration := &sharedmodel.Registration{}
|
|
|
|
if err := srv.data.Preload("AuthConfig").Where("reference = ? and status = ?", in.Reference, sharedmodel.RegistrationStatusPending).First(registration).Error; err != nil {
|
|
log.Printf("Invalid ref")
|
|
return nil, errNotAuthorized
|
|
}
|
|
|
|
md, ok := metadata.FromIncomingContext(ctx)
|
|
if !ok {
|
|
log.Printf("No metadata")
|
|
return nil, errNotAuthorized
|
|
}
|
|
|
|
// The keys within metadata.MD are normalized to lowercase.
|
|
// See: https://godoc.org/google.golang.org/grpc/metadata#New
|
|
if a, ok := md["authorization"]; !ok {
|
|
log.Printf("No token provided")
|
|
return nil, errNotAuthorized
|
|
} else {
|
|
if a[0] != registration.AuthConfig.Raw {
|
|
log.Printf("Invalid token; eXpected: %v; got: %v", registration.AuthConfig.Raw, a[0])
|
|
return nil, errNotAuthorized
|
|
}
|
|
}
|
|
|
|
resp := &openkv.CompleteRegistrationResponse{}
|
|
if in.RegistrationToken != registration.PSK {
|
|
resp.Error = &openkv.Error{
|
|
Code: 1,
|
|
Message: "Invalid PSK",
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
conn := &sharedmodel.Connection{
|
|
OrganisationId: registration.OrganisationId,
|
|
OrganisationIdSystem: registration.OrganisationIdSystem,
|
|
OrganisationDisplayName: registration.OrganisationDisplayName,
|
|
AuthConfig: registration.AuthConfig.Clone(),
|
|
}
|
|
|
|
srv.data.Create(conn)
|
|
|
|
registration.Status = sharedmodel.RegistrationStatusCompleted
|
|
srv.data.Save(registration)
|
|
|
|
resp.Success = true
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (srv *OpenKVServer) ConfigService(
|
|
ctx context.Context, in *openkv.ConfigServiceRequest,
|
|
) (*openkv.ConfigServiceResponse, error) {
|
|
conn, err := requireConnection(srv.data, ctx)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
service := &sharedmodel.Service{}
|
|
if err := srv.data.Where("service_id = ?", in.Service).First(service).Error; err != nil {
|
|
return nil, fmt.Errorf("Invalid service: %v", service.ServiceID)
|
|
}
|
|
|
|
cnf := &sharedmodel.ServiceConfig{}
|
|
|
|
if err := srv.data.Where("connection_id = ? and service_id = ?", conn.ID, service.ID).First(cnf); err != nil {
|
|
cnf.ConnectionID = conn.ID
|
|
cnf.ServiceID = service.ID
|
|
}
|
|
|
|
log.Printf("Update service config %v for conn: %v", cnf.Service.Name, conn.ID)
|
|
|
|
cnf.Enabled = in.Enabled
|
|
|
|
cnf.PushProtocol = &sharedmodel.ProtocolConfig{
|
|
Protocol: in.Push.Protocol,
|
|
AuthConfig: sharedmodel.NewAuthConfig(in.Push.Auth),
|
|
}
|
|
|
|
cnf.PushProtocol.SetConfig(in.Push.Config)
|
|
// TODO actually init authdata
|
|
cnf.PushProtocol.AuthConfig.Raw = "1111"
|
|
|
|
cnf.FetchProtocol = &sharedmodel.ProtocolConfig{
|
|
Protocol: in.Fetch.Protocol,
|
|
AuthConfig: sharedmodel.NewAuthConfig(in.Fetch.Auth),
|
|
}
|
|
|
|
cnf.FetchProtocol.SetConfig(in.Fetch.Config)
|
|
// TODO actually init authdata
|
|
cnf.FetchProtocol.AuthConfig.Raw = "1111"
|
|
|
|
if cnf.ID == 0 {
|
|
if err := srv.data.Create(cnf).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
if err := srv.data.Save(cnf).Error; err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// If disabled unsubscribe all subscriptions
|
|
if !cnf.Enabled {
|
|
srv.data.Unscoped().Where("service_config_id = ?", cnf.ID).Delete(&sharedmodel.Subscription{})
|
|
}
|
|
|
|
resp := &openkv.ConfigServiceResponse{
|
|
Success: true,
|
|
Service: in.Service,
|
|
Enabled: in.Enabled,
|
|
Fetch: &openkv.ServiceConfig{
|
|
Protocol: "https://whiteboxsystems.nl/protospecs/whitebox-fetch/http",
|
|
Auth: &openkv.AuthConfig{
|
|
Method: openkv.AuthMethod_APIToken,
|
|
Config: &openkv.AuthConfig_ApiTokenConfig{&openkv.APITokenConfig{Token: "1111"}},
|
|
},
|
|
},
|
|
Push: &openkv.ServiceConfig{
|
|
Protocol: "https://whiteboxsystems.nl/protospecs/whitebox-push/http",
|
|
Auth: &openkv.AuthConfig{
|
|
Method: openkv.AuthMethod_APIToken,
|
|
Config: &openkv.AuthConfig_ApiTokenConfig{&openkv.APITokenConfig{Token: "1111"}},
|
|
},
|
|
},
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (srv *OpenKVServer) UpdateSubscriptions(
|
|
ctx context.Context, in *openkv.UpdateSubscriptionsRequest,
|
|
) (*openkv.UpdateSubscriptionsResponse, error) {
|
|
conn, err := requireConnection(srv.data, ctx)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
serviceConfig, err := requireService(srv.data, conn, in.ServiceId)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
subscriptionErrors := []*openkv.SubscriptionError{}
|
|
|
|
if err := srv.data.Transaction(func(tx *gorm.DB) error {
|
|
for idx, sd := range in.SubscriptionData {
|
|
subscription := &sharedmodel.Subscription{}
|
|
err := srv.data.Where(
|
|
"subject_external_id = ? and subject_external_id_system = ? and service_config_id = ?",
|
|
sd.Subject.ExternalId,
|
|
sd.Subject.ExternalIdSystem,
|
|
serviceConfig.ID,
|
|
).First(subscription).Error
|
|
|
|
if err != nil && err != gorm.ErrRecordNotFound {
|
|
return err
|
|
} else if err != nil && sd.Subscribe {
|
|
sub := &sharedmodel.Subscription{
|
|
SubjectExternalId: sd.Subject.ExternalId,
|
|
SubjectExternalIdSystem: sd.Subject.ExternalIdSystem,
|
|
SubjectName: sd.Subject.Name,
|
|
SubjectBirthdate: sd.Subject.Birthdate,
|
|
ServiceConfigID: serviceConfig.ID,
|
|
}
|
|
// TODO check if it is valid metadata for the specified protocol
|
|
sub.SetProtocolMeta(sd.ProtocolMeta)
|
|
|
|
if err := srv.data.Create(sub).Error; err != nil {
|
|
subscriptionErrors = append(subscriptionErrors, &openkv.SubscriptionError{
|
|
Index: int32(idx),
|
|
Error: &openkv.Error{
|
|
Code: openkv.ErrServiceException,
|
|
Message: fmt.Sprintf("Subject with id: %v (%v) could not be persisted; %v", sd.Subject.ExternalId, sd.Subject.ExternalIdSystem, err),
|
|
},
|
|
})
|
|
}
|
|
|
|
log.Printf("add subscription: %v", sd.Subject.ExternalId)
|
|
continue
|
|
} else if err != nil && !sd.Subscribe {
|
|
subscriptionErrors = append(subscriptionErrors, &openkv.SubscriptionError{
|
|
Index: int32(idx),
|
|
Error: &openkv.Error{
|
|
Code: openkv.ErrCodeAlreadySubscribed,
|
|
Message: fmt.Sprintf("Subject with id: %v (%v) already unsubscribed", sd.Subject.ExternalId, sd.Subject.ExternalIdSystem),
|
|
},
|
|
})
|
|
continue
|
|
} else if !sd.Subscribe {
|
|
if err := srv.data.Unscoped().Delete(subscription).Error; err != nil {
|
|
subscriptionErrors = append(subscriptionErrors, &openkv.SubscriptionError{
|
|
Index: int32(idx),
|
|
Error: &openkv.Error{
|
|
Code: openkv.ErrServiceException,
|
|
Message: fmt.Sprintf("Subject with id: %v (%v) could not be removed; %v", sd.Subject.ExternalId, sd.Subject.ExternalIdSystem, err)},
|
|
})
|
|
}
|
|
log.Printf("delete subscription: %v", sd.Subject.ExternalId)
|
|
continue
|
|
}
|
|
|
|
subscription.SubjectExternalId = sd.Subject.ExternalId
|
|
subscription.SubjectExternalIdSystem = sd.Subject.ExternalIdSystem
|
|
subscription.SubjectName = sd.Subject.Name
|
|
subscription.SubjectBirthdate = sd.Subject.Birthdate
|
|
subscription.SetProtocolMeta(sd.ProtocolMeta)
|
|
|
|
if err := srv.data.Save(subscription).Error; err != nil {
|
|
subscriptionErrors = append(subscriptionErrors, &openkv.SubscriptionError{
|
|
Index: int32(idx),
|
|
Error: &openkv.Error{
|
|
Code: openkv.ErrServiceException,
|
|
Message: fmt.Sprintf("Subject with id: %v (%v) could not be updated; %v", sd.Subject.ExternalId, sd.Subject.ExternalIdSystem, err)},
|
|
})
|
|
continue
|
|
}
|
|
|
|
log.Printf("update subscription: %v", sd.Subject.ExternalId)
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp := &openkv.UpdateSubscriptionsResponse{
|
|
Success: true,
|
|
Errors: subscriptionErrors,
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (srv *OpenKVServer) ListSubscriptions(
|
|
ctx context.Context, in *openkv.ListSubscriptionsRequest,
|
|
) (*openkv.ListSubscriptionsResponse, error) {
|
|
conn, err := requireConnection(srv.data, ctx)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
serviceConfig, err := requireService(srv.data, conn, in.ServiceId)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
subscriptions := []*sharedmodel.Subscription{}
|
|
|
|
srv.data.Where("service_config_id = ?", serviceConfig.ID).Find(&subscriptions)
|
|
|
|
subs := []*openkv.SubscriptionData{}
|
|
|
|
for _, s := range subscriptions {
|
|
meta := map[string]string{}
|
|
s.GetProtocolMeta(&meta)
|
|
subs = append(subs, &openkv.SubscriptionData{
|
|
Subject: &openkv.PatientMeta{
|
|
ExternalId: s.SubjectExternalId,
|
|
ExternalIdSystem: s.SubjectExternalIdSystem,
|
|
Name: s.SubjectName,
|
|
Birthdate: s.SubjectBirthdate,
|
|
},
|
|
ProtocolMeta: meta,
|
|
})
|
|
}
|
|
|
|
resp := &openkv.ListSubscriptionsResponse{
|
|
Success: true,
|
|
SubscriptionData: subs,
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func NewServer() *OpenKVServer {
|
|
return &OpenKVServer{}
|
|
}
|
|
|