You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
okapidemo/dvzaservice/openapisrv.go

636 lines
18 KiB

package main
import (
"context"
"crypto"
"fmt"
"log"
"github.com/gofrs/uuid"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
"gorm.io/gorm"
"src.whiteboxsystems.nl/DECOZO/okapi"
"src.whiteboxsystems.nl/DECOZO/okapidemo/certgen"
"src.whiteboxsystems.nl/DECOZO/okapidemo/dvzaservice/model"
"src.whiteboxsystems.nl/DECOZO/okapidemo/sharedmodel"
)
var errNotAuthorized = fmt.Errorf("Not Authorized")
var errInvalidService = fmt.Errorf("Invalid service")
var errActiveServiceConfig = fmt.Errorf("Service not activated")
type OkAPIServer struct {
okapi.UnimplementedOkAPIServer
data *gorm.DB
}
func toStruct(m map[string]interface{}) *structpb.Struct {
s, err := structpb.NewStruct(m)
if err != nil {
panic(err)
}
return s
}
func NewOKAPIErr(err *okapi.OKAPIError) error {
s := status.New(codes.InvalidArgument, err.Message)
s, _ = s.WithDetails(err)
return s.Err()
}
func (srv *OkAPIServer) LoadData(location string) error {
var err error
srv.data, err = model.GetDB(location)
return err
}
func requireConnection(db *gorm.DB, ctx context.Context) (*sharedmodel.Connection, error) {
var method okapi.XISAuthMethod
var raw string
if p, ok := peer.FromContext(ctx); ok {
if mtls, ok := p.AuthInfo.(credentials.TLSInfo); ok {
item := mtls.State.PeerCertificates[0]
log.Println("request certificate subject:", item.Subject)
pk, err := certgen.PublicKeyToJWK(item.PublicKey)
if err != nil {
return nil, errNotAuthorized
}
log.Println("jwk", pk.Algorithm(), err)
fp, err := pk.Thumbprint(crypto.SHA256)
if err != nil {
return nil, errNotAuthorized
}
log.Printf("jwk err: %v fp:%X\n", err, fp)
method = okapi.XISAuthMethod_mTLS
raw = fmt.Sprintf("%X", fp)
}
}
connection := &sharedmodel.Connection{}
if rowsFound := db.Preload("AuthMethod").Raw(`
SELECT conn.*
FROM connections conn
JOIN xis_auth_configs a on conn.auth_config_id = a.id WHERE a.method = ? and a.raw = ?
`, method, raw).Scan(connection).RowsAffected; rowsFound != 1 {
log.Printf("Invalid connection; rows found: %v;", rowsFound)
return nil, errNotAuthorized
}
return connection, nil
}
func requireService(db *gorm.DB, conn *sharedmodel.Connection, serviceID string) (*sharedmodel.ServiceConfig, error) {
service := &sharedmodel.ServiceDefinition{}
if err := db.Where("service_id = ?", serviceID).First(service).Error; err != nil {
return nil, errInvalidService
}
srvConfig := &sharedmodel.ServiceConfig{}
if err := db.Where("connection_id = ? and service_id = ?", conn.ID, true, service.ID).First(srvConfig).Error; err != nil {
return nil, errActiveServiceConfig
}
return srvConfig, nil
}
func (srv *OkAPIServer) GetMetadata(
ctx context.Context, in *okapi.GetMetadataRequest,
) (*okapi.GetMetadataResponse, error) {
log.Printf("Got metadata request")
resp := &okapi.GetMetadataResponse{
SupplierDisplayName: "ACME inc.",
SupplierFormalName: "B.V. ACME inc.",
ProductName: "DVZA",
}
return resp, nil
}
func (srv *OkAPIServer) Register(
ctx context.Context, in *okapi.RegisterRequest,
) (*okapi.RegisterResponse, error) {
ref, _ := uuid.NewV4()
psk, _ := uuid.NewV4()
reg := &sharedmodel.Registration{
Reference: ref.String(),
OrganisationIdentifier: in.OrganisationIdentifier,
OrganisationIdentifierType: in.OrganisationIdentifierType,
OrganisationDisplayName: in.OrganisationDisplayName,
PSK: psk.String()[0:6],
Status: sharedmodel.RegistrationStatusPending,
}
if err := reg.SetAuthConfig(in.Auth); err != nil {
return nil, NewOKAPIErr(&okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: err.Error(),
})
}
if err := srv.data.Create(reg).Error; err != nil {
return nil, NewOKAPIErr(&okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: err.Error(),
})
}
resp := &okapi.RegisterResponse{
Reference: ref.String(),
}
log.Printf("Got registration request from %v; ref: %v; PSK: %v", reg.OrganisationDisplayName, reg.Reference, reg.PSK)
return resp, nil
}
func (srv *OkAPIServer) ListServices(
ctx context.Context, in *okapi.ListServicesRequest,
) (*okapi.ListServicesResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
return nil, err
}
services := []*okapi.Service{}
presentServices := []sharedmodel.ServiceDefinition{}
srv.data.Find(&presentServices)
cnfs := []*sharedmodel.ServiceConfig{}
srv.data.Preload("PushProtocol").Preload("PushProtocol.AuthConfig").
Preload("FetchProtocol").Preload("FetchProtocol.AuthConfig").
Where("connection_id = ?", conn.ID).
Find(&cnfs)
cnfMap := map[string]*sharedmodel.ServiceConfig{}
for _, cnf := range cnfs {
cnfMap[cnf.Service.ServiceID] = cnf
}
for _, service := range presentServices {
srv := &okapi.Service{
Name: service.Name,
Description: service.Description,
Id: service.ServiceID,
SubscriptionPolicy: service.SubscriptionPolicy,
ConsentPolicy: service.ConsentPolicy,
FetchProtocols: service.GetFetchProtocols(),
PushProtocols: service.GetPushProtocols(),
}
if conf, ok := cnfMap[service.ServiceID]; ok {
srv.Enabled = true
if conf.FetchProtocol != nil {
srv.FetchConfiguration = &okapi.CallbackConfiguration{
Protocol: conf.FetchProtocol.Protocol,
Configuration: conf.FetchProtocol.ConfigToOkapi(),
Auth: conf.FetchProtocol.AuthConfig.ToOkapi(),
}
} else {
srv.FetchConfiguration = &okapi.CallbackConfiguration{}
}
if conf.PushProtocol != nil {
srv.PushConfiguration = &okapi.CallbackConfiguration{
Protocol: conf.PushProtocol.Protocol,
Configuration: conf.PushProtocol.ConfigToOkapi(),
Auth: conf.PushProtocol.AuthConfig.ToOkapi(),
}
} else {
srv.PushConfiguration = &okapi.CallbackConfiguration{}
}
}
services = append(services, srv)
}
return &okapi.ListServicesResponse{
Services: services,
}, nil
}
func (srv *OkAPIServer) CompleteRegistration(
ctx context.Context, in *okapi.CompleteRegistrationRequest,
) (*okapi.CompleteRegistrationResponse, error) {
registration := &sharedmodel.Registration{}
if err := srv.data.Preload("AuthConfig").Where("reference = ? and status = ?", in.Reference, sharedmodel.RegistrationStatusPending).First(registration).Error; err != nil {
log.Printf("Invalid ref")
return nil, errNotAuthorized
}
var method okapi.XISAuthMethod
var raw string
if p, ok := peer.FromContext(ctx); ok {
if mtls, ok := p.AuthInfo.(credentials.TLSInfo); ok {
item := mtls.State.PeerCertificates[0]
pk, err := certgen.PublicKeyToJWK(item.PublicKey)
if err != nil {
return nil, errNotAuthorized
}
fp, err := pk.Thumbprint(crypto.SHA256)
if err != nil {
return nil, errNotAuthorized
}
method = okapi.XISAuthMethod_mTLS
raw = fmt.Sprintf("%X", fp)
}
}
if method != okapi.XISAuthMethod(registration.AuthConfig.Method) {
log.Printf("Invalid method; eXpected: %v; got: %v", registration.AuthConfig.Raw, method)
return nil, errNotAuthorized
}
if raw != registration.AuthConfig.Raw {
log.Printf("Invalid fp; eXpected: %v; got: %v", registration.AuthConfig.Raw, raw)
return nil, errNotAuthorized
}
resp := &okapi.CompleteRegistrationResponse{}
if in.AuthorizationToken != registration.PSK {
return resp, NewOKAPIErr(&okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_InvalidXISAuthConfiguration,
Message: "Invalid Public Key",
})
}
conn := &sharedmodel.Connection{
OrganisationIdentifier: registration.OrganisationIdentifier,
OrganisationIdentifierType: registration.OrganisationIdentifierType,
OrganisationDisplayName: registration.OrganisationDisplayName,
AuthConfig: registration.AuthConfig.Clone(),
}
srv.data.Create(conn)
registration.Status = sharedmodel.RegistrationStatusCompleted
srv.data.Save(registration)
return resp, nil
}
func (srv *OkAPIServer) EnableService(
ctx context.Context, in *okapi.EnableServiceRequest,
) (*okapi.EnableServiceResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
return nil, err
}
service := &sharedmodel.ServiceDefinition{}
if err := srv.data.Where("service_id = ?", in.ServiceId).First(service).Error; err != nil {
return nil, fmt.Errorf("Invalid service: %v", service.ServiceID)
}
cnf := &sharedmodel.ServiceConfig{}
if err := srv.data.Preload("PushProtocol").Preload("PushProtocol.AuthConfig").
Preload("FetchProtocol").Preload("FetchProtocol.AuthConfig").
Where("connection_id = ? and service_id = ?", conn.ID, service.ID).
First(cnf); err == nil {
return nil, NewOKAPIErr(&okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_ServiceAlreadyActivated,
Message: fmt.Sprintf("Service %v (%v) already activated", service.Name, service.ServiceID),
})
}
cnf.ConnectionID = conn.ID
cnf.ServiceID = service.ID
// TODO negotiate and check if config is valid
cnf.FetchProtocol = &sharedmodel.ProtocolConfig{
Protocol: in.Fetch.Protocol,
AuthConfig: sharedmodel.NewAuthConfig(in.Fetch.Auth),
}
cnf.FetchProtocol.SetConfig(in.Fetch.Configuration)
// TODO actually init authdata instead of hardcoded
cnf.FetchProtocol.AuthConfig.Raw = "2222"
cnf.PushProtocol = &sharedmodel.ProtocolConfig{
AuthConfig: &sharedmodel.AuthConfig{},
}
if err := srv.data.Create(cnf).Error; err != nil {
return nil, err
}
log.Printf("Enabled service %v for conn: %v", service.Name, conn.ID)
resp := &okapi.EnableServiceResponse{
ServiceId: in.ServiceId,
Fetch: &okapi.CallbackConfiguration{
Protocol: cnf.FetchProtocol.Protocol,
Configuration: cnf.FetchProtocol.ConfigToOkapi(),
Auth: &okapi.ProtocolAuthConfiguration{
Method: sharedmodel.AuthMethodDecozoBearerToken,
Configuration: toStruct(map[string]interface{}{"token": "2222"}),
},
},
Push: &okapi.CallbackConfiguration{},
}
return resp, nil
}
func (srv *OkAPIServer) DisableService(
ctx context.Context, in *okapi.DisableServiceRequest,
) (*okapi.DisableServiceResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
return nil, err
}
service := &sharedmodel.ServiceDefinition{}
if err := srv.data.Where("service_id = ?", in.ServiceId).First(service).Error; err != nil {
return nil, fmt.Errorf("Invalid service: %v", service.ServiceID)
}
cnf := &sharedmodel.ServiceConfig{}
if err := srv.data.Preload("PushProtocol").Preload("PushProtocol.AuthConfig").
Preload("FetchProtocol").Preload("FetchProtocol.AuthConfig").
Where("connection_id = ? and service_id = ?", conn.ID, service.ID).
First(cnf); err != nil {
return nil, NewOKAPIErr(&okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_ServiceNotActive,
Message: fmt.Sprintf("Service %v (%v) not yet activate", service.Name, service.ServiceID),
})
}
// If disabled unsubscribe all subscriptions
srv.data.Unscoped().Where("service_config_id = ?", cnf.ID).Delete(&sharedmodel.Subscription{})
// If disabled remove config
srv.data.Unscoped().Where("id = ?", cnf.ID).Delete(&sharedmodel.ServiceConfig{})
log.Printf("Disable service for conn: %v", cnf.Service.Name, conn.ID)
return &okapi.DisableServiceResponse{}, nil
}
func (srv *OkAPIServer) CreateOrUpdatePatientRegistrations(
ctx context.Context, in *okapi.CreateOrUpdatePatientRegistrationsRequest,
) (*okapi.CreateOrUpdatePatientRegistrationsResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
return nil, err
}
serviceConfig, err := requireService(srv.data, conn, in.ServiceId)
if err != nil {
return nil, err
}
errors := []*okapi.PatientRegistrationResult{}
for _, sd := range in.Registrations {
subscription := &sharedmodel.Subscription{}
err := srv.data.Where(
"id = ? and service_config_id = ?",
sd.Id,
sd.Subject.Identifier.Type,
serviceConfig.ID,
).First(subscription).Error
if err != nil && err != gorm.ErrRecordNotFound {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd.Id,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: fmt.Sprintf("Registration with id: %v could not be queried; %v", sd.Id, err),
},
})
continue
}
if err != nil {
sub := &sharedmodel.Subscription{
ID: sd.Id,
SubjectExternalId: sd.Subject.Identifier.Value,
SubjectExternalIdSystem: sd.Subject.Identifier.Type,
SubjectDisplayName: sd.Subject.Name.Display,
SubjectGiven: sd.Subject.Name.Given,
SubjectOwnName: sd.Subject.Name.OwnName,
SubjectOwnNamePrefix: sd.Subject.Name.OwnNamePrefix,
SubjectPartnerName: sd.Subject.Name.PartnerName,
SubjectPartnerNamePrefix: sd.Subject.Name.PartnerNamePrefix,
SubjectBirthdate: sd.Subject.Birthdate,
SubjectAddressStreet: sd.Subject.Address.Street,
SubjectAddressStreetNumber: sd.Subject.Address.StreetNumber,
SubjectAddressPostalCode: sd.Subject.Address.PostalCode,
SubjectAddressCity: sd.Subject.Address.City,
SubjectAddressCountry: sd.Subject.Address.Country,
ServiceConfigID: serviceConfig.ID,
}
// TODO check if it is valid metadata for the specified protocol
sub.SetProtocolMeta(sd.CallbackProtocolData)
if err := srv.data.Create(sub).Error; err != nil {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd.Id,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: fmt.Sprintf("Registration with id: %v could not be persisted; %v", sd.Id, err),
},
})
continue
}
log.Printf("add subscription: %v", sd.Subject.Identifier.Value)
continue
}
// Update
subscription.SubjectExternalId = sd.Subject.Identifier.Value
subscription.SubjectExternalIdSystem = sd.Subject.Identifier.Type
subscription.SubjectDisplayName = sd.Subject.Name.Display
subscription.SubjectGiven = sd.Subject.Name.Given
subscription.SubjectOwnName = sd.Subject.Name.OwnName
subscription.SubjectOwnNamePrefix = sd.Subject.Name.OwnNamePrefix
subscription.SubjectPartnerName = sd.Subject.Name.PartnerName
subscription.SubjectPartnerNamePrefix = sd.Subject.Name.PartnerNamePrefix
subscription.SubjectBirthdate = sd.Subject.Birthdate
subscription.SubjectAddressStreet = sd.Subject.Address.Street
subscription.SubjectAddressStreetNumber = sd.Subject.Address.StreetNumber
subscription.SubjectAddressPostalCode = sd.Subject.Address.PostalCode
subscription.SubjectAddressCity = sd.Subject.Address.City
subscription.SubjectAddressCountry = sd.Subject.Address.Country
subscription.ServiceConfigID = serviceConfig.ID
subscription.SetProtocolMeta(sd.CallbackProtocolData)
if err := srv.data.Save(subscription).Error; err != nil {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd.Id,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: fmt.Sprintf("Registration with id: %v could not be updated; %v", sd.Id, err),
},
})
continue
}
log.Printf("update subscription: %v", sd.Subject.Identifier.Value)
}
resp := &okapi.CreateOrUpdatePatientRegistrationsResponse{
Results: errors,
}
return resp, nil
}
func (srv *OkAPIServer) RemovePatientRegistrations(
ctx context.Context, in *okapi.RemovePatientRegistrationsRequest,
) (*okapi.RemovePatientRegistrationsResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
return nil, err
}
serviceConfig, err := requireService(srv.data, conn, in.ServiceId)
if err != nil {
return nil, err
}
errors := []*okapi.PatientRegistrationResult{}
for _, sd := range in.Registrations {
subscription := &sharedmodel.Subscription{}
err := srv.data.Where(
"id = ? and service_config_id = ?",
sd,
serviceConfig.ID,
).First(subscription).Error
if err != nil && err != gorm.ErrRecordNotFound {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: fmt.Sprintf("Registration with id: %v could not be queried; %v", sd, err),
},
})
continue
}
if err != nil {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_AlreadyUnsubscribed,
Message: fmt.Sprintf("Registration with id: %v already unsubscribed; %v", sd, err),
},
})
continue
}
if err := srv.data.Unscoped().Delete(subscription).Error; err != nil {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: fmt.Sprintf("Registration with id: %v could not be removed; %v", sd, err),
},
})
continue
}
log.Printf("remove patient registration: %v", sd)
}
resp := &okapi.RemovePatientRegistrationsResponse{
Results: errors,
}
return resp, nil
}
func (srv *OkAPIServer) ListPatientRegistrations(
ctx context.Context, in *okapi.ListPatientRegistrationsRequest,
) (*okapi.ListPatientRegistrationsResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
return nil, err
}
serviceConfig, err := requireService(srv.data, conn, in.ServiceId)
if err != nil {
return nil, err
}
subscriptions := []*sharedmodel.Subscription{}
srv.data.Where("service_config_id = ?", serviceConfig.ID).Find(&subscriptions)
subs := []*okapi.PatientRegistrationData{}
for _, s := range subscriptions {
meta := map[string]interface{}{}
s.GetProtocolMeta(&meta)
subs = append(subs, &okapi.PatientRegistrationData{
Id: fmt.Sprintf("%v", s.ID),
Subject: &okapi.PatientMeta{
Identifier: &okapi.Identifier{
Type: s.SubjectExternalIdSystem,
Value: s.SubjectExternalId,
},
Name: &okapi.Name{
Display: s.SubjectDisplayName,
Given: s.SubjectGiven,
OwnName: s.SubjectOwnName,
OwnNamePrefix: s.SubjectOwnNamePrefix,
PartnerName: s.SubjectPartnerName,
PartnerNamePrefix: s.SubjectPartnerNamePrefix,
},
Birthdate: s.SubjectBirthdate,
Address: &okapi.Address{
Street: s.SubjectAddressStreet,
StreetNumber: s.SubjectAddressStreetNumber,
PostalCode: s.SubjectAddressPostalCode,
City: s.SubjectAddressCity,
Country: s.SubjectAddressCountry,
},
},
CallbackProtocolData: toStruct(meta),
})
}
resp := &okapi.ListPatientRegistrationsResponse{
PatientRegistrationData: subs,
}
return resp, nil
}
func NewServer() *OkAPIServer {
return &OkAPIServer{}
}