Update dvza service to implement new okapi spec

master
Bas Kloosterman 2 years ago
parent e8811adf99
commit e49e768454
  1. 4
      dvzaservice/app/src/Connection.js
  2. 2
      dvzaservice/app/src/Connections.js
  3. 2
      dvzaservice/app/src/Registrations.js
  4. 62
      dvzaservice/main.go
  5. 25
      dvzaservice/model/db.go
  6. 682
      dvzaservice/openapisrv.go
  7. 32
      dvzaservice/srv.go

@ -19,7 +19,7 @@ const Subscriptions = ({service}) => {
<tbody>
{service.Subscriptions.map(x => {
return (<tr key={x.ID}>
<td>{x.SubjectName}</td>
<td>{x.SubjectDisplayName}</td>
<td>{x.SubjectExternalId}</td>
<td>{x.SubjectBirthdate}</td>
<td><Link to={`/connecties/${service.ConnectionID}/${service.ID}/${x.ID}`}>Bekijk dossier</Link></td>
@ -45,7 +45,7 @@ const Connection = () => {
return (
<div>
<h1 className="t-page-header">Verbinding</h1>
{(connection && service) ? (<h2>{connection.OrganisationDisplayName} ({connection.OrganisationId}) | {service.Service.Name}</h2>) : null}
{(connection && service) ? (<h2>{connection.OrganisationDisplayName} ({connection.OrganisationIdentifier}) | {service.Service.Name}</h2>) : null}
{<Subscriptions service={service}/>}
</div>
);

@ -27,7 +27,7 @@ const App = () => {
<tbody>
{connections.map(x => {
return (<tr key={x.ID}>
<td>{x.OrganisationId}</td>
<td>{x.OrganisationIdentifier}</td>
<td>{x.OrganisationDisplayName}</td>
<td>{x.Services.length ? x.Services.map((s) => {
return <span key={s.Service.ID} style={{marginRight: 10}}><Link to={`/connecties/${x.ID}/${s.ID}`} >{s.Service.Name} ({subscriptionsCount(s)})</Link></span>

@ -20,7 +20,7 @@ const App = () => {
</tr>
{registrations.map(x => {
return (<tr key={x.ID}>
<td>{x.OrganisationId}</td>
<td>{x.OrganisationIdentifier}</td>
<td>{x.OrganisationDisplayName}</td>
<td>{x.Reference}</td>
<td>{x.PSK}</td>

@ -2,6 +2,8 @@ package main
import (
"context"
"crypto/tls"
"io/ioutil"
"log"
"net"
"os"
@ -9,12 +11,66 @@ import (
"sync"
"google.golang.org/grpc"
"whiteboxsystems.nl/openkvpoc/openkv"
"google.golang.org/grpc/credentials"
"src.whiteboxsystems.nl/DECOZO/okapi"
"whiteboxsystems.nl/okapidemo/certgen"
)
var rpcAddr = "0.0.0.0:9999"
var uiAddr = "0.0.0.0:9095"
func loadCert() *tls.Certificate {
_, err := os.Stat("certs/client.crt")
if err != nil {
_, _, certPem, keyPem, err := certgen.GenCert("dvza", "dvza")
if err != nil {
panic(err)
}
if err != nil {
panic(err)
}
if err := ioutil.WriteFile("certs/client.crt", []byte(certPem), 0600); err != nil {
panic(err)
}
if err := ioutil.WriteFile("certs/client.key", []byte(keyPem), 0600); err != nil {
panic(err)
}
}
certificate, err := tls.LoadX509KeyPair("certs/client.crt", "certs/client.key")
if err != nil {
panic("Load client certification failed: " + err.Error())
}
return &certificate
}
func loadKeyPair() credentials.TransportCredentials {
certificate := loadCert()
// data, err := ioutil.ReadFile("certs/ca.crt")
// if err != nil {
// panic("failed to load CA file: " + err.Error())
// }
// capool := x509.NewCertPool()
// if !capool.AppendCertsFromPEM(data) {
// panic("can't add ca cert")
// }
tlsConfig := &tls.Config{
ClientAuth: tls.RequestClientCert,
Certificates: []tls.Certificate{*certificate},
// ClientCAs: capool,
}
return credentials.NewTLS(tlsConfig)
}
func main() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt)
@ -23,7 +79,7 @@ func main() {
openapisrv := NewServer()
openapisrv.LoadData("./data/data.db")
opts := []grpc.ServerOption{
// grpc.UnaryInterceptor(openapisrv.EnsureValidModule),
grpc.Creds(loadKeyPair()),
}
grpcServer := grpc.NewServer(opts...)
@ -34,7 +90,7 @@ func main() {
log.Fatalf("failed to listen: %v", err)
}
openkv.RegisterOpenKVServer(grpcServer, openapisrv)
okapi.RegisterOkAPIServer(grpcServer, openapisrv)
log.Printf("RPC Listening on %v", rpcAddr)
wg.Add(1)
grpcServer.Serve(lis)

@ -3,12 +3,15 @@ package model
import (
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"whiteboxsystems.nl/openkvpoc/openkv"
"whiteboxsystems.nl/openkvpoc/sharedmodel"
"gorm.io/gorm/logger"
"src.whiteboxsystems.nl/DECOZO/okapi"
"whiteboxsystems.nl/okapidemo/sharedmodel"
)
func GetDB(location string) (*gorm.DB, error) {
db, err := gorm.Open(sqlite.Open(location), &gorm.Config{})
db, err := gorm.Open(sqlite.Open(location), &gorm.Config{
Logger: logger.Default.LogMode(logger.Info),
})
if err != nil {
return nil, err
}
@ -16,31 +19,33 @@ func GetDB(location string) (*gorm.DB, error) {
// Migrate the schema
db.AutoMigrate(&sharedmodel.Registration{})
db.AutoMigrate(&sharedmodel.Connection{})
db.AutoMigrate(&sharedmodel.Service{})
db.AutoMigrate(&sharedmodel.ServiceDefinition{})
db.AutoMigrate(&sharedmodel.AuthConfig{})
db.AutoMigrate(&sharedmodel.ProtocolConfig{})
db.AutoMigrate(&sharedmodel.ServiceConfig{})
db.AutoMigrate(&sharedmodel.Subscription{})
var cnt int64
db.Model(&sharedmodel.Service{}).Count(&cnt)
db.Model(&sharedmodel.ServiceDefinition{}).Count(&cnt)
if cnt == 0 {
db.Create(&sharedmodel.Service{
db.Create(&sharedmodel.ServiceDefinition{
Name: "MedMij DVZA",
Description: "MedMij compliant PGO koppeling",
SubscriptionPolicy: openkv.SubscriptionPolicy_optout,
ConsentPolicy: openkv.ConsentPolicy_presumed,
SubscriptionPolicy: okapi.SubscriptionPolicy_optout,
ConsentPolicy: okapi.ConsentPolicy_presumed,
ServiceID: "acme:dvza",
FetchProtocols: sharedmodel.ProtocolArray{
{
"https://hl7.nl/fhir",
[]openkv.AuthMethod{openkv.AuthMethod_APIToken},
Protocol: "https://hl7.nl/fhir",
AuthMethods: []string{"BearerToken"},
},
},
PushProtocols: sharedmodel.ProtocolArray{},
})
}
db.Exec("PRAGMA foreign_keys = ON", nil)
return db, nil
}

@ -2,134 +2,230 @@ package main
import (
"context"
"crypto"
"fmt"
"log"
"github.com/gofrs/uuid"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/structpb"
"gorm.io/gorm"
"whiteboxsystems.nl/openkvpoc/dvzaservice/model"
"whiteboxsystems.nl/openkvpoc/openkv"
"whiteboxsystems.nl/openkvpoc/sharedmodel"
"src.whiteboxsystems.nl/DECOZO/okapi"
"whiteboxsystems.nl/okapidemo/certgen"
"whiteboxsystems.nl/okapidemo/dvzaservice/model"
"whiteboxsystems.nl/okapidemo/sharedmodel"
)
var errNotAuthorized = fmt.Errorf("Not Authorized")
var errInvalidService = fmt.Errorf("Invalid service")
var errActiveServiceConfig = fmt.Errorf("Service not activated")
type OpenKVServer struct {
openkv.UnimplementedOpenKVServer
type OkAPIServer struct {
okapi.UnimplementedOkAPIServer
data *gorm.DB
}
func (srv *OpenKVServer) LoadData(location string) error {
func toStruct(m map[string]interface{}) *structpb.Struct {
s, err := structpb.NewStruct(m)
if err != nil {
panic(err)
}
return s
}
func NewOKAPIErr(err *okapi.OKAPIError) error {
s := status.New(codes.InvalidArgument, err.Message)
s, _ = s.WithDetails(err)
return s.Err()
}
func (srv *OkAPIServer) LoadData(location string) error {
var err error
srv.data, err = model.GetDB(location)
return err
}
func requireConnection(db *gorm.DB, ctx context.Context) (*sharedmodel.Connection, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
log.Printf("No metadata")
return nil, errNotAuthorized
var method okapi.XISAuthMethod
var raw string
if p, ok := peer.FromContext(ctx); ok {
if mtls, ok := p.AuthInfo.(credentials.TLSInfo); ok {
item := mtls.State.PeerCertificates[0]
log.Println("request certificate subject:", item.Subject)
pk, err := certgen.PublicKeyToJWK(item.PublicKey)
if err != nil {
return nil, errNotAuthorized
}
log.Println("jwk", pk.Algorithm(), err)
fp, err := pk.Thumbprint(crypto.SHA256)
if err != nil {
return nil, errNotAuthorized
}
log.Printf("jwk err: %v fp:%X\n", err, fp)
method = okapi.XISAuthMethod_mTLS
raw = fmt.Sprintf("%X", fp)
}
}
connection := &sharedmodel.Connection{}
if a, ok := md["authorization"]; !ok {
log.Printf("No token provided")
if rowsFound := db.Preload("AuthMethod").Raw(`
SELECT conn.*
FROM connections conn
JOIN xis_auth_configs a on conn.auth_config_id = a.id WHERE a.method = ? and a.raw = ?
`, method, raw).Scan(connection).RowsAffected; rowsFound != 1 {
log.Printf("Invalid connection; rows found: %v;", rowsFound)
return nil, errNotAuthorized
} else {
if err := db.Preload("AuthMethod").Raw(`
SELECT conn.*
FROM connections conn
JOIN auth_configs a on conn.auth_config_id = a.id WHERE a.method = ? and a.raw = ?
`, openkv.AuthMethod_APIToken, a[0]).Scan(connection).Error; err != nil {
log.Printf("Invalid token; err: %v;", err)
return nil, errNotAuthorized
}
}
return connection, nil
}
func requireService(db *gorm.DB, conn *sharedmodel.Connection, serviceID string) (*sharedmodel.ServiceConfig, error) {
service := &sharedmodel.Service{}
service := &sharedmodel.ServiceDefinition{}
if err := db.Where("service_id = ?", serviceID).First(service).Error; err != nil {
return nil, errInvalidService
}
srvConfig := &sharedmodel.ServiceConfig{}
if err := db.Where("connection_id = ? and enabled = ? and service_id = ?", conn.ID, true, service.ID).First(srvConfig).Error; err != nil {
if err := db.Where("connection_id = ? and service_id = ?", conn.ID, true, service.ID).First(srvConfig).Error; err != nil {
return nil, errActiveServiceConfig
}
return srvConfig, nil
}
func (srv *OpenKVServer) GetMetadata(
ctx context.Context, in *openkv.GetMetadataRequest,
) (*openkv.GetMetadataResponse, error) {
func (srv *OkAPIServer) GetMetadata(
ctx context.Context, in *okapi.GetMetadataRequest,
) (*okapi.GetMetadataResponse, error) {
log.Printf("Got metadata request")
services := []*openkv.ServiceDefinition{}
presentServices := []sharedmodel.Service{}
srv.data.Find(&presentServices)
for _, service := range presentServices {
services = append(services, &openkv.ServiceDefinition{
Name: service.Name,
Description: service.Description,
Id: service.ServiceID,
SubscriptionPolicy: service.SubscriptionPolicy,
ConsentPolicy: service.ConsentPolicy,
FetchProtocols: service.GetFetchProtocols(),
PushProtocols: service.GetPushProtocols(),
})
}
resp := &openkv.GetMetadataResponse{
Supplier: "ACME inc.",
System: "DVZA",
Services: services,
Success: true,
resp := &okapi.GetMetadataResponse{
SupplierDisplayName: "ACME inc.",
SupplierFormalName: "B.V. ACME inc.",
ProductName: "DVZA",
}
return resp, nil
}
func (srv *OpenKVServer) Register(
ctx context.Context, in *openkv.RegisterRequest,
) (*openkv.RegisterResponse, error) {
func (srv *OkAPIServer) Register(
ctx context.Context, in *okapi.RegisterRequest,
) (*okapi.RegisterResponse, error) {
ref, _ := uuid.NewV4()
psk, _ := uuid.NewV4()
reg := &sharedmodel.Registration{
Reference: ref.String(),
OrganisationId: in.OrganisationId,
OrganisationIdSystem: in.OrganisationIdSystem,
OrganisationDisplayName: in.OrganisationDisplayName,
PSK: psk.String()[0:6],
Status: sharedmodel.RegistrationStatusPending,
Reference: ref.String(),
OrganisationIdentifier: in.OrganisationIdentifier,
OrganisationIdentifierType: in.OrganisationIdentifierType,
OrganisationDisplayName: in.OrganisationDisplayName,
PSK: psk.String()[0:6],
Status: sharedmodel.RegistrationStatusPending,
}
reg.SetAuthConfig(in.Auth)
if err := reg.SetAuthConfig(in.Auth); err != nil {
return nil, NewOKAPIErr(&okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: err.Error(),
})
}
srv.data.Create(reg)
if err := srv.data.Create(reg).Error; err != nil {
return nil, NewOKAPIErr(&okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: err.Error(),
})
}
resp := &openkv.RegisterResponse{
resp := &okapi.RegisterResponse{
Reference: ref.String(),
Success: true,
}
log.Printf("Got registration request from %v; ref: %v; PSK: %v", reg.OrganisationDisplayName, reg.Reference, reg.PSK)
return resp, nil
}
func (srv *OpenKVServer) CompleteRegistration(
ctx context.Context, in *openkv.CompleteRegistrationRequest,
) (*openkv.CompleteRegistrationResponse, error) {
func (srv *OkAPIServer) ListServices(
ctx context.Context, in *okapi.ListServicesRequest,
) (*okapi.ListServicesResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
return nil, err
}
services := []*okapi.Service{}
presentServices := []sharedmodel.ServiceDefinition{}
srv.data.Find(&presentServices)
cnfs := []*sharedmodel.ServiceConfig{}
srv.data.Preload("PushProtocol").Preload("PushProtocol.AuthConfig").
Preload("FetchProtocol").Preload("FetchProtocol.AuthConfig").
Where("connection_id = ?", conn.ID).
Find(&cnfs)
cnfMap := map[string]*sharedmodel.ServiceConfig{}
for _, cnf := range cnfs {
cnfMap[cnf.Service.ServiceID] = cnf
}
for _, service := range presentServices {
srv := &okapi.Service{
Name: service.Name,
Description: service.Description,
Id: service.ServiceID,
SubscriptionPolicy: service.SubscriptionPolicy,
ConsentPolicy: service.ConsentPolicy,
FetchProtocols: service.GetFetchProtocols(),
PushProtocols: service.GetPushProtocols(),
}
if conf, ok := cnfMap[service.ServiceID]; ok {
srv.Enabled = true
if conf.FetchProtocol != nil {
srv.FetchConfiguration = &okapi.CallbackConfiguration{
Protocol: conf.FetchProtocol.Protocol,
Configuration: conf.FetchProtocol.ConfigToOkapi(),
Auth: conf.FetchProtocol.AuthConfig.ToOkapi(),
}
} else {
srv.FetchConfiguration = &okapi.CallbackConfiguration{}
}
if conf.PushProtocol != nil {
srv.PushConfiguration = &okapi.CallbackConfiguration{
Protocol: conf.PushProtocol.Protocol,
Configuration: conf.PushProtocol.ConfigToOkapi(),
Auth: conf.PushProtocol.AuthConfig.ToOkapi(),
}
} else {
srv.PushConfiguration = &okapi.CallbackConfiguration{}
}
}
services = append(services, srv)
}
return &okapi.ListServicesResponse{
Services: services,
}, nil
}
func (srv *OkAPIServer) CompleteRegistration(
ctx context.Context, in *okapi.CompleteRegistrationRequest,
) (*okapi.CompleteRegistrationResponse, error) {
registration := &sharedmodel.Registration{}
if err := srv.data.Preload("AuthConfig").Where("reference = ? and status = ?", in.Reference, sharedmodel.RegistrationStatusPending).First(registration).Error; err != nil {
@ -137,39 +233,51 @@ func (srv *OpenKVServer) CompleteRegistration(
return nil, errNotAuthorized
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
log.Printf("No metadata")
return nil, errNotAuthorized
var method okapi.XISAuthMethod
var raw string
if p, ok := peer.FromContext(ctx); ok {
if mtls, ok := p.AuthInfo.(credentials.TLSInfo); ok {
item := mtls.State.PeerCertificates[0]
pk, err := certgen.PublicKeyToJWK(item.PublicKey)
if err != nil {
return nil, errNotAuthorized
}
fp, err := pk.Thumbprint(crypto.SHA256)
if err != nil {
return nil, errNotAuthorized
}
method = okapi.XISAuthMethod_mTLS
raw = fmt.Sprintf("%X", fp)
}
}
// The keys within metadata.MD are normalized to lowercase.
// See: https://godoc.org/google.golang.org/grpc/metadata#New
if a, ok := md["authorization"]; !ok {
log.Printf("No token provided")
if method != okapi.XISAuthMethod(registration.AuthConfig.Method) {
log.Printf("Invalid method; eXpected: %v; got: %v", registration.AuthConfig.Raw, method)
return nil, errNotAuthorized
} else {
if a[0] != registration.AuthConfig.Raw {
log.Printf("Invalid token; eXpected: %v; got: %v", registration.AuthConfig.Raw, a[0])
return nil, errNotAuthorized
}
}
resp := &openkv.CompleteRegistrationResponse{}
if in.RegistrationToken != registration.PSK {
resp.Error = &openkv.Error{
Code: 1,
Message: "Invalid PSK",
}
if raw != registration.AuthConfig.Raw {
log.Printf("Invalid fp; eXpected: %v; got: %v", registration.AuthConfig.Raw, raw)
return nil, errNotAuthorized
}
return resp, nil
resp := &okapi.CompleteRegistrationResponse{}
if in.AuthorizationToken != registration.PSK {
return resp, NewOKAPIErr(&okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_InvalidXISAuthConfiguration,
Message: "Invalid Public Key",
})
}
conn := &sharedmodel.Connection{
OrganisationId: registration.OrganisationId,
OrganisationIdSystem: registration.OrganisationIdSystem,
OrganisationDisplayName: registration.OrganisationDisplayName,
AuthConfig: registration.AuthConfig.Clone(),
OrganisationIdentifier: registration.OrganisationIdentifier,
OrganisationIdentifierType: registration.OrganisationIdentifierType,
OrganisationDisplayName: registration.OrganisationDisplayName,
AuthConfig: registration.AuthConfig.Clone(),
}
srv.data.Create(conn)
@ -177,86 +285,113 @@ func (srv *OpenKVServer) CompleteRegistration(
registration.Status = sharedmodel.RegistrationStatusCompleted
srv.data.Save(registration)
resp.Success = true
return resp, nil
}
func (srv *OpenKVServer) ConfigService(
ctx context.Context, in *openkv.ConfigServiceRequest,
) (*openkv.ConfigServiceResponse, error) {
func (srv *OkAPIServer) EnableService(
ctx context.Context, in *okapi.EnableServiceRequest,
) (*okapi.EnableServiceResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
return nil, err
}
service := &sharedmodel.Service{}
if err := srv.data.Where("service_id = ?", in.Service).First(service).Error; err != nil {
service := &sharedmodel.ServiceDefinition{}
if err := srv.data.Where("service_id = ?", in.ServiceId).First(service).Error; err != nil {
return nil, fmt.Errorf("Invalid service: %v", service.ServiceID)
}
cnf := &sharedmodel.ServiceConfig{}
if err := srv.data.Where("connection_id = ? and service_id = ?", conn.ID, service.ID).First(cnf); err != nil {
cnf.ConnectionID = conn.ID
cnf.ServiceID = service.ID
}
log.Printf("Update service config %v for conn: %v", cnf.Service.Name, conn.ID)
cnf.Enabled = in.Enabled
// TODO actually init authdata
cnf.PushProtocol = &sharedmodel.ProtocolConfig{
Protocol: in.Push.Protocol,
AuthConfig: sharedmodel.NewAuthConfig(in.Push.Auth),
if err := srv.data.Preload("PushProtocol").Preload("PushProtocol.AuthConfig").
Preload("FetchProtocol").Preload("FetchProtocol.AuthConfig").
Where("connection_id = ? and service_id = ?", conn.ID, service.ID).
First(cnf); err == nil {
return nil, NewOKAPIErr(&okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_ServiceAlreadyActivated,
Message: fmt.Sprintf("Service %v (%v) already activated", service.Name, service.ServiceID),
})
}
cnf.PushProtocol.SetConfig(in.Push.Config)
cnf.PushProtocol.AuthConfig.Raw = "2222"
cnf.ConnectionID = conn.ID
cnf.ServiceID = service.ID
// TODO negotiate and check if config is valid
cnf.FetchProtocol = &sharedmodel.ProtocolConfig{
Protocol: in.Fetch.Protocol,
AuthConfig: sharedmodel.NewAuthConfig(in.Fetch.Auth),
}
cnf.FetchProtocol.SetConfig(in.Fetch.Config)
cnf.FetchProtocol.SetConfig(in.Fetch.Configuration)
// TODO actually init authdata instead of hardcoded
cnf.FetchProtocol.AuthConfig.Raw = "2222"
if cnf.ID == 0 {
if err := srv.data.Create(cnf).Error; err != nil {
return nil, err
}
} else {
if err := srv.data.Save(cnf).Error; err != nil {
return nil, err
}
cnf.PushProtocol = &sharedmodel.ProtocolConfig{
AuthConfig: &sharedmodel.AuthConfig{},
}
// If disabled unsubscribe all subscriptions
if !cnf.Enabled {
srv.data.Unscoped().Where("service_config_id = ?", cnf.ID).Delete(&sharedmodel.Subscription{})
}
resp := &openkv.ConfigServiceResponse{
Success: true,
Service: in.Service,
Enabled: in.Enabled,
Fetch: &openkv.ServiceConfig{
Protocol: "https://hl7.nl/fhir",
Auth: &openkv.AuthConfig{
Method: openkv.AuthMethod_APIToken,
Config: &openkv.AuthConfig_ApiTokenConfig{&openkv.APITokenConfig{Token: "2222"}},
if err := srv.data.Create(cnf).Error; err != nil {
return nil, err
}
log.Printf("Enabled service %v for conn: %v", service.Name, conn.ID)
resp := &okapi.EnableServiceResponse{
ServiceId: in.ServiceId,
Fetch: &okapi.CallbackConfiguration{
Protocol: cnf.FetchProtocol.Protocol,
Configuration: cnf.FetchProtocol.ConfigToOkapi(),
Auth: &okapi.ProtocolAuthConfiguration{
Method: "BearerToken",
Configuration: toStruct(map[string]interface{}{"token": "2222"}),
},
},
Push: &okapi.CallbackConfiguration{},
}
return resp, nil
}
func (srv *OpenKVServer) UpdateSubscriptions(
ctx context.Context, in *openkv.UpdateSubscriptionsRequest,
) (*openkv.UpdateSubscriptionsResponse, error) {
func (srv *OkAPIServer) DisableService(
ctx context.Context, in *okapi.DisableServiceRequest,
) (*okapi.DisableServiceResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
return nil, err
}
service := &sharedmodel.ServiceDefinition{}
if err := srv.data.Where("service_id = ?", in.ServiceId).First(service).Error; err != nil {
return nil, fmt.Errorf("Invalid service: %v", service.ServiceID)
}
cnf := &sharedmodel.ServiceConfig{}
if err := srv.data.Preload("PushProtocol").Preload("PushProtocol.AuthConfig").
Preload("FetchProtocol").Preload("FetchProtocol.AuthConfig").
Where("connection_id = ? and service_id = ?", conn.ID, service.ID).
First(cnf); err != nil {
return nil, NewOKAPIErr(&okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_ServiceNotActive,
Message: fmt.Sprintf("Service %v (%v) not yet activate", service.Name, service.ServiceID),
})
}
// If disabled unsubscribe all subscriptions
srv.data.Unscoped().Where("service_config_id = ?", cnf.ID).Delete(&sharedmodel.Subscription{})
// If disabled remove config
srv.data.Unscoped().Where("id = ?", cnf.ID).Delete(&sharedmodel.ServiceConfig{})
log.Printf("Disable service for conn: %v", cnf.Service.Name, conn.ID)
return &okapi.DisableServiceResponse{}, nil
}
func (srv *OkAPIServer) CreateOrUpdatePatientRegistrations(
ctx context.Context, in *okapi.CreateOrUpdatePatientRegistrationsRequest,
) (*okapi.CreateOrUpdatePatientRegistrationsResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
@ -269,100 +404,177 @@ func (srv *OpenKVServer) UpdateSubscriptions(
return nil, err
}
subscriptionErrors := []*openkv.SubscriptionError{}
if err := srv.data.Transaction(func(tx *gorm.DB) error {
for idx, sd := range in.SubscriptionData {
subscription := &sharedmodel.Subscription{}
err := srv.data.Where(
"subject_external_id = ? and subject_external_id_system = ? and service_config_id = ?",
sd.Subject.ExternalId,
sd.Subject.ExternalIdSystem,
serviceConfig.ID,
).First(subscription).Error
if err != nil && err != gorm.ErrRecordNotFound {
return err
} else if err != nil && sd.Subscribe {
sub := &sharedmodel.Subscription{
SubjectExternalId: sd.Subject.ExternalId,
SubjectExternalIdSystem: sd.Subject.ExternalIdSystem,
SubjectName: sd.Subject.Name,
SubjectBirthdate: sd.Subject.Birthdate,
ServiceConfigID: serviceConfig.ID,
}
// TODO check if it is valid metadata for the specified protocol
sub.SetProtocolMeta(sd.ProtocolMeta)
if err := srv.data.Create(sub).Error; err != nil {
subscriptionErrors = append(subscriptionErrors, &openkv.SubscriptionError{
Index: int32(idx),
Error: &openkv.Error{
Code: openkv.ErrServiceException,
Message: fmt.Sprintf("Subject with id: %v (%v) could not be persisted; %v", sd.Subject.ExternalId, sd.Subject.ExternalIdSystem, err),
},
})
}
errors := []*okapi.PatientRegistrationResult{}
for _, sd := range in.Registrations {
subscription := &sharedmodel.Subscription{}
err := srv.data.Where(
"id = ? and service_config_id = ?",
sd.Id,
sd.Subject.Identifier.Type,
serviceConfig.ID,
).First(subscription).Error
if err != nil && err != gorm.ErrRecordNotFound {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd.Id,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: fmt.Sprintf("Registration with id: %v could not be queried; %v", sd.Id, err),
},
})
continue
}
log.Printf("add subscription: %v", sd.Subject.ExternalId)
continue
} else if err != nil && !sd.Subscribe {
subscriptionErrors = append(subscriptionErrors, &openkv.SubscriptionError{
Index: int32(idx),
Error: &openkv.Error{
Code: openkv.ErrCodeAlreadySubscribed,
Message: fmt.Sprintf("Subject with id: %v (%v) already unsubscribed", sd.Subject.ExternalId, sd.Subject.ExternalIdSystem),
},
})
continue
} else if !sd.Subscribe {
if err := srv.data.Unscoped().Delete(subscription).Error; err != nil {
subscriptionErrors = append(subscriptionErrors, &openkv.SubscriptionError{
Index: int32(idx),
Error: &openkv.Error{
Code: openkv.ErrServiceException,
Message: fmt.Sprintf("Subject with id: %v (%v) could not be removed; %v", sd.Subject.ExternalId, sd.Subject.ExternalIdSystem, err)},
})
}
log.Printf("delete subscription: %v", sd.Subject.ExternalId)
continue
if err != nil {
sub := &sharedmodel.Subscription{
ID: sd.Id,
SubjectExternalId: sd.Subject.Identifier.Value,
SubjectExternalIdSystem: sd.Subject.Identifier.Type,
SubjectDisplayName: sd.Subject.Name.Display,
SubjectGiven: sd.Subject.Name.Given,
SubjectOwnName: sd.Subject.Name.OwnName,
SubjectOwnNamePrefix: sd.Subject.Name.OwnNamePrefix,
SubjectPartnerName: sd.Subject.Name.PartnerName,
SubjectPartnerNamePrefix: sd.Subject.Name.PartnerNamePrefix,
SubjectBirthdate: sd.Subject.Birthdate,
SubjectAddressStreet: sd.Subject.Address.Street,
SubjectAddressStreetNumber: sd.Subject.Address.StreetNumber,
SubjectAddressPostalCode: sd.Subject.Address.PostalCode,
SubjectAddressCity: sd.Subject.Address.City,
SubjectAddressCountry: sd.Subject.Address.Country,
ServiceConfigID: serviceConfig.ID,
}
subscription.SubjectExternalId = sd.Subject.ExternalId
subscription.SubjectExternalIdSystem = sd.Subject.ExternalIdSystem
subscription.SubjectName = sd.Subject.Name
subscription.SubjectBirthdate = sd.Subject.Birthdate
subscription.SetProtocolMeta(sd.ProtocolMeta)
if err := srv.data.Save(subscription).Error; err != nil {
subscriptionErrors = append(subscriptionErrors, &openkv.SubscriptionError{
Index: int32(idx),
Error: &openkv.Error{
Code: openkv.ErrServiceException,
Message: fmt.Sprintf("Subject with id: %v (%v) could not be updated; %v", sd.Subject.ExternalId, sd.Subject.ExternalIdSystem, err)},
// TODO check if it is valid metadata for the specified protocol
sub.SetProtocolMeta(sd.CallbackProtocolData)
if err := srv.data.Create(sub).Error; err != nil {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd.Id,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: fmt.Sprintf("Registration with id: %v could not be persisted; %v", sd.Id, err),
},
})
continue
}
log.Printf("update subscription: %v", sd.Subject.ExternalId)
log.Printf("add subscription: %v", sd.Subject.Identifier.Value)
continue
}
return nil
}); err != nil {
// Update
subscription.SubjectExternalId = sd.Subject.Identifier.Value
subscription.SubjectExternalIdSystem = sd.Subject.Identifier.Type
subscription.SubjectDisplayName = sd.Subject.Name.Display
subscription.SubjectGiven = sd.Subject.Name.Given
subscription.SubjectOwnName = sd.Subject.Name.OwnName
subscription.SubjectOwnNamePrefix = sd.Subject.Name.OwnNamePrefix
subscription.SubjectPartnerName = sd.Subject.Name.PartnerName
subscription.SubjectPartnerNamePrefix = sd.Subject.Name.PartnerNamePrefix
subscription.SubjectBirthdate = sd.Subject.Birthdate
subscription.SubjectAddressStreet = sd.Subject.Address.Street
subscription.SubjectAddressStreetNumber = sd.Subject.Address.StreetNumber
subscription.SubjectAddressPostalCode = sd.Subject.Address.PostalCode
subscription.SubjectAddressCity = sd.Subject.Address.City
subscription.SubjectAddressCountry = sd.Subject.Address.Country
subscription.ServiceConfigID = serviceConfig.ID
subscription.SetProtocolMeta(sd.CallbackProtocolData)
if err := srv.data.Save(subscription).Error; err != nil {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd.Id,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: fmt.Sprintf("Registration with id: %v could not be updated; %v", sd.Id, err),
},
})
continue
}
log.Printf("update subscription: %v", sd.Subject.Identifier.Value)
}
resp := &okapi.CreateOrUpdatePatientRegistrationsResponse{
Results: errors,
}
return resp, nil
}
func (srv *OkAPIServer) RemovePatientRegistrations(
ctx context.Context, in *okapi.RemovePatientRegistrationsRequest,
) (*okapi.RemovePatientRegistrationsResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
return nil, err
}
resp := &openkv.UpdateSubscriptionsResponse{
Success: true,
Errors: subscriptionErrors,
serviceConfig, err := requireService(srv.data, conn, in.ServiceId)
if err != nil {
return nil, err
}
errors := []*okapi.PatientRegistrationResult{}
for _, sd := range in.Registrations {
subscription := &sharedmodel.Subscription{}
err := srv.data.Where(
"id = ? and service_config_id = ?",
sd,
serviceConfig.ID,
).First(subscription).Error
if err != nil && err != gorm.ErrRecordNotFound {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: fmt.Sprintf("Registration with id: %v could not be queried; %v", sd, err),
},
})
continue
}
if err != nil {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_AlreadyUnsubscribed,
Message: fmt.Sprintf("Registration with id: %v already unsubscribed; %v", sd, err),
},
})
continue
}
if err := srv.data.Unscoped().Delete(subscription).Error; err != nil {
errors = append(errors, &okapi.PatientRegistrationResult{
Id: sd,
Error: &okapi.OKAPIError{
Code: okapi.OKAPIErrorCode_GenericException,
Message: fmt.Sprintf("Registration with id: %v could not be removed; %v", sd, err),
},
})
continue
}
log.Printf("remove patient registration: %v", sd)
}
resp := &okapi.RemovePatientRegistrationsResponse{
Results: errors,
}
return resp, nil
}
func (srv *OpenKVServer) ListSubscriptions(
ctx context.Context, in *openkv.ListSubscriptionsRequest,
) (*openkv.ListSubscriptionsResponse, error) {
func (srv *OkAPIServer) ListPatientRegistrations(
ctx context.Context, in *okapi.ListPatientRegistrationsRequest,
) (*okapi.ListPatientRegistrationsResponse, error) {
conn, err := requireConnection(srv.data, ctx)
if err != nil {
@ -379,30 +591,46 @@ func (srv *OpenKVServer) ListSubscriptions(
srv.data.Where("service_config_id = ?", serviceConfig.ID).Find(&subscriptions)
subs := []*openkv.SubscriptionData{}
subs := []*okapi.PatientRegistrationData{}
for _, s := range subscriptions {
meta := map[string]string{}
meta := map[string]interface{}{}
s.GetProtocolMeta(&meta)
subs = append(subs, &openkv.SubscriptionData{
Subject: &openkv.PatientMeta{
ExternalId: s.SubjectExternalId,
ExternalIdSystem: s.SubjectExternalIdSystem,
Name: s.SubjectName,
Birthdate: s.SubjectBirthdate,
subs = append(subs, &okapi.PatientRegistrationData{
Id: fmt.Sprintf("%v", s.ID),
Subject: &okapi.PatientMeta{
Identifier: &okapi.Identifier{
Type: s.SubjectExternalIdSystem,
Value: s.SubjectExternalId,
},
Name: &okapi.Name{
Display: s.SubjectDisplayName,
Given: s.SubjectGiven,
OwnName: s.SubjectOwnName,
OwnNamePrefix: s.SubjectOwnNamePrefix,
PartnerName: s.SubjectPartnerName,
PartnerNamePrefix: s.SubjectPartnerNamePrefix,
},
Birthdate: s.SubjectBirthdate,
Address: &okapi.Address{
Street: s.SubjectAddressStreet,
StreetNumber: s.SubjectAddressStreetNumber,
PostalCode: s.SubjectAddressPostalCode,
City: s.SubjectAddressCity,
Country: s.SubjectAddressCountry,
},
},
ProtocolMeta: meta,
CallbackProtocolData: toStruct(meta),
})
}
resp := &openkv.ListSubscriptionsResponse{
Success: true,
SubscriptionData: subs,
resp := &okapi.ListPatientRegistrationsResponse{
PatientRegistrationData: subs,
}
return resp, nil
}
func NewServer() *OpenKVServer {
return &OpenKVServer{}
func NewServer() *OkAPIServer {
return &OkAPIServer{}
}

@ -2,6 +2,7 @@ package main
import (
"context"
"crypto/tls"
"fmt"
"io"
"log"
@ -9,14 +10,15 @@ import (
"github.com/gin-gonic/gin"
"gorm.io/gorm"
"whiteboxsystems.nl/openkvpoc/dvzaservice/model"
"whiteboxsystems.nl/openkvpoc/sharedmodel"
"whiteboxsystems.nl/okapidemo/dvzaservice/model"
"whiteboxsystems.nl/okapidemo/sharedmodel"
)
type UIService struct {
srv *http.Server
inited bool
data *gorm.DB
srv *http.Server
inited bool
data *gorm.DB
clientCert tls.Certificate
}
func (srv *UIService) LoadData(location string) error {
@ -36,7 +38,7 @@ func (srv *UIService) ListenAndServe() {
if !srv.inited {
srv.init()
}
log.Println("Listening on %v", srv.srv.Addr)
log.Printf("Listening on %v\n", srv.srv.Addr)
srv.srv.ListenAndServe()
}
@ -109,13 +111,26 @@ func (srv *UIService) GetPatient(c *gin.Context) {
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Authorization", serviceConfig.FetchProtocol.AuthConfig.Raw)
resp, err := http.DefaultClient.Do(req)
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
},
}
resp, err := client.Do(req)
if err != nil {
c.AbortWithError(500, err)
return
}
if resp.StatusCode >= 400 {
c.AbortWithError(resp.StatusCode, fmt.Errorf("%v", resp.Status))
return
}
io.Copy(c.Writer, resp.Body)
}
@ -199,10 +214,11 @@ func (srv *UIService) GetRegistrations(c *gin.Context) {
// }
func NewUIServer(addr string) *UIService {
cert := loadCert()
srv := &UIService{srv: &http.Server{
Addr: addr,
Handler: gin.Default(),
}}
}, clientCert: *cert}
return srv
}

Loading…
Cancel
Save