You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
okapidemo/his/srv.go

632 lines
15 KiB

package main
import (
"context"
"crypto"
"crypto/tls"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"strconv"
"strings"
"time"
"github.com/gin-gonic/gin"
"google.golang.org/grpc/credentials"
"gorm.io/gorm"
"src.whiteboxsystems.nl/decozo/okapi"
"src.whiteboxsystems.nl/decozo/okapidemo/cryptoutil"
"src.whiteboxsystems.nl/decozo/okapidemo/his/model"
"src.whiteboxsystems.nl/decozo/okapidemo/sharedmodel"
)
func loadCert() *tls.Certificate {
_, err := os.Stat("certs/client.crt")
if err != nil {
_, _, certPem, keyPem, err := cryptoutil.GenCert("whitebox", "whitebox")
if err != nil {
panic(err)
}
if err != nil {
panic(err)
}
if err := ioutil.WriteFile("certs/client.crt", []byte(certPem), 0600); err != nil {
panic(err)
}
if err := ioutil.WriteFile("certs/client.key", []byte(keyPem), 0600); err != nil {
panic(err)
}
}
certificate, err := tls.LoadX509KeyPair("certs/client.crt", "certs/client.key")
if err != nil {
panic("Load client certification failed: " + err.Error())
}
return &certificate
}
func loadKeyPair() credentials.TransportCredentials {
certificate := loadCert()
tlsConfig := &tls.Config{
ClientAuth: tls.RequestClientCert,
Certificates: []tls.Certificate{*certificate},
// ClientCAs: capool,
}
return credentials.NewTLS(tlsConfig)
}
type HISServer struct {
srv *http.Server
inited bool
data *gorm.DB
stopTasks chan struct{}
clientCert tls.Certificate
}
func (srv *HISServer) LoadData(location string) error {
var err error
srv.data, err = model.GetDB(location)
return err
}
func (srv *HISServer) Addr() string {
if srv.srv == nil {
return ""
}
return srv.srv.Addr
}
func (srv *HISServer) ListenAndServe() {
if !srv.inited {
srv.init()
}
log.Println("Listening on %v", srv.srv.Addr)
srv.srv.ListenAndServeTLS("", "")
}
func (srv *HISServer) Shutdown(ctx context.Context) error {
return srv.srv.Shutdown(ctx)
}
func (srv *HISServer) init() {
r := srv.srv.Handler.(*gin.Engine)
r.LoadHTMLGlob("templates/*")
r.Static("/assets", "./assets")
r.GET("/", func(c *gin.Context) {
c.Redirect(301, "/ui")
})
r.GET("/ui", srv.GetIndex)
r.GET("/ui/*page", srv.GetIndex)
r.GET("/api/patients", srv.GetPatients)
r.POST("/api/patients/:id/consent", srv.UpdateConsent)
r.DELETE("/api/patients/:id/consent/:consentID", srv.DeleteConsent)
r.GET("/api/serviceProviders", srv.GetServiceProviders)
r.POST("/api/serviceProviders", srv.NewServiceProvider)
r.POST("/api/serviceProviders/:id/activate", srv.ActivateServiceProvider)
r.POST("/api/serviceProviders/:id/services", srv.ModService)
r.GET("/api/serviceProviders/:id", srv.GetServiceProvider)
r.GET("/api/services", srv.GetServices)
r.POST("/api/services/:id/subscriptions", srv.UpdateSubscription)
r.Use(srv.Authenticate)
r.GET("/external/api/patients/:id", srv.GetPatient)
r.GET("/external/fhir/Patient", srv.GetFHIRPatient)
srv.inited = true
ticker := time.NewTicker(30 * time.Second)
srv.stopTasks = make(chan struct{})
srv.TaskSyncPatients()
go func() {
for {
select {
case <-ticker.C:
srv.TaskSyncPatients()
case <-srv.stopTasks:
ticker.Stop()
return
}
}
}()
}
func (srv *HISServer) GetIndex(c *gin.Context) {
c.HTML(http.StatusOK, "index.html", gin.H{})
}
func (srv *HISServer) diffPatients(a []*okapi.PatientRegistrationData, b []model.Patient) []model.Patient {
mappedPatients := map[string]bool{}
diff := []model.Patient{}
for _, p := range a {
mappedPatients[p.Id] = true
}
for _, p := range b {
if _, ok := mappedPatients[p.ExternalId]; !ok {
diff = append(diff, p)
}
}
return diff
}
func (srv *HISServer) intersectPatients(a []*okapi.PatientRegistrationData, b []model.Patient) []model.Patient {
mappedPatients := map[string]bool{}
intersect := []model.Patient{}
for _, p := range a {
mappedPatients[p.Id] = true
}
for _, p := range b {
if _, ok := mappedPatients[p.ExternalId]; ok {
intersect = append(intersect, p)
}
}
return intersect
}
func (srv *HISServer) TaskSyncPatients() {
services := []model.Service{}
srv.data.Preload("ServiceProvider").Preload("ServiceProvider.AuthConfig").Preload("Subscriptions").Find(&services)
patients := []model.Patient{}
srv.data.Find(&patients)
mappedPatients := map[string]model.Patient{}
for _, pat := range patients {
mappedPatients[pat.ExternalId] = pat
}
for _, service := range services {
mappedSubscriptions := map[string]model.Patient{}
for _, sub := range service.Subscriptions {
mappedSubscriptions[sub.ExternalId] = sub
}
activePatients := []model.Patient{}
inactivePatients := []model.Patient{}
for _, p := range patients {
if _, ok := mappedSubscriptions[p.ExternalId]; ok {
if service.SubscriptionPolicy == okapi.SubscriptionPolicy_optout {
// Subscriptions means opted out, so unsubscribe
inactivePatients = append(inactivePatients, p)
} else {
activePatients = append(activePatients, p)
}
} else {
if service.SubscriptionPolicy == okapi.SubscriptionPolicy_optout {
activePatients = append(activePatients, p)
} else {
inactivePatients = append(inactivePatients, p)
}
}
}
currentPatients, err := srv.listPatientRegistrations(service.ServiceProvider, &service)
if err != nil {
log.Println("Could not retrieve current subscriptions: %v", service.Name, err)
continue
}
toSubscribe := srv.diffPatients(currentPatients, activePatients)
if len(toSubscribe) > 0 {
_, err := srv.subscribePatients(service.ServiceProvider, &service, activePatients)
if err != nil {
log.Println("active patients for optout (%v) err: %v", service.Name, err)
} else {
log.Printf("sync patients for %v: %v patients subscribed", service.Name, len(toSubscribe))
}
}
toUnSubscribe := srv.intersectPatients(currentPatients, inactivePatients)
if len(toUnSubscribe) > 0 {
_, err = srv.unsubscribePatients(service.ServiceProvider, &service, inactivePatients)
if err != nil {
log.Println("inactive patients for optout (%v) err: %v", service.Name, err)
} else {
log.Printf("sync patients for %v: %v patients unsubscribed", service.Name, len(toUnSubscribe))
}
}
if len(toSubscribe) == 0 && len(toUnSubscribe) == 0 {
log.Println("Patient sync, nothing to sync")
}
}
}
func (srv *HISServer) Authenticate(c *gin.Context) {
if !strings.HasPrefix(c.Request.RequestURI, "/external/") {
return
}
raw := ""
method := ""
if len(c.Request.TLS.PeerCertificates) > 0 {
jwk, err := cryptoutil.PublicKeyToJWK(c.Request.TLS.PeerCertificates[0].PublicKey)
if err != nil {
log.Printf("Error extracting public key JKW: %v", err)
c.Status(401)
c.Abort()
return
}
rawBytes, _ := jwk.Thumbprint(crypto.SHA256)
raw = fmt.Sprintf("%X", rawBytes)
method = sharedmodel.AuthMethodDecozoMTLS
} else {
raw = c.Request.Header.Get("Authorization")
method = sharedmodel.AuthMethodDecozoBearerToken
}
authConfig := &sharedmodel.AuthConfig{}
if err := srv.data.Where("raw = ? and method = ?", raw, method).First(authConfig).Error; err != nil {
c.Status(401)
c.Abort()
return
}
if strings.HasPrefix(c.Request.RequestURI, "/external/api") {
service := &model.Service{}
if srv.data.Where(
"auth_config_id = ? and service_id like ?", authConfig.ID, "wbx:%",
).First(service).Error == nil {
c.Set("authenticatedService", service)
return
}
}
if strings.HasPrefix(c.Request.RequestURI, "/external/fhir") {
service := &model.Service{}
if srv.data.Where(
"auth_config_id = ? and service_id like ?", authConfig.ID, "%:dvza",
).First(service).Error == nil {
c.Set("authenticatedService", service)
return
}
}
c.Status(401)
c.Abort()
}
func (srv *HISServer) GetPatients(c *gin.Context) {
patients := []model.Patient{}
if err := srv.data.Preload("Consent").Find(&patients).Error; err != nil {
c.AbortWithError(500, err)
return
}
c.JSON(200, patients)
}
func (srv *HISServer) GetServiceProviders(c *gin.Context) {
connections := []model.ServiceProvider{}
if err := srv.data.Preload("AuthConfig").Preload("Services").Find(&connections).Error; err != nil {
c.AbortWithError(500, err)
return
}
c.JSON(200, connections)
}
func (srv *HISServer) GetServices(c *gin.Context) {
services := []model.Service{}
if err := srv.data.Preload("ServiceProvider").Preload("Subscriptions").Find(&services).Error; err != nil {
c.AbortWithError(500, err)
return
}
c.JSON(200, services)
}
func (srv *HISServer) GetServiceProvider(c *gin.Context) {
connID := c.Param("id")
connection := &model.ServiceProvider{}
if err := srv.data.Where("id = ?", connID).Preload("AuthConfig").Preload("Services").Find(connection).Error; err != nil {
c.AbortWithError(404, err)
return
}
serviceMeta, err := srv.listServices(connection)
if err != nil {
c.AbortWithError(400, err)
return
}
c.JSON(200, map[string]interface{}{"connection": connection, "meta": serviceMeta.Services})
}
func (srv *HISServer) ModService(c *gin.Context) {
var payload struct {
Active bool `json:"active"`
Service string `json:"service"`
}
c.BindJSON(&payload)
connID := c.Param("id")
connection := &model.ServiceProvider{}
if err := srv.data.Where("id = ?", connID).Preload("AuthConfig").Preload("Services").Find(connection).Error; err != nil {
c.AbortWithError(404, err)
return
}
var err error
if payload.Active {
err = srv.enableService(connection, payload.Service)
} else {
err = srv.disableService(connection, payload.Service)
}
if err != nil {
log.Println("err: %v", err)
c.AbortWithError(400, err)
return
}
c.Status(200)
}
func (srv *HISServer) applyPolicy(sub bool, service *model.Service) bool {
if service.SubscriptionPolicy == okapi.SubscriptionPolicy_optout {
return !sub
}
return sub
}
func (srv *HISServer) UpdateConsent(c *gin.Context) {
consent := &model.Consent{}
c.BindJSON(consent)
pId, _ := strconv.ParseUint(c.Param("id"), 10, 64)
consent.PatientID = uint(pId)
if consent.ID != 0 {
srv.data.Save(consent)
c.Status(200)
} else {
srv.data.Create(consent)
c.Status(201)
}
}
func (srv *HISServer) DeleteConsent(c *gin.Context) {
consent := &model.Consent{}
srv.data.Unscoped().Where("id = ? and patient_id = ?", c.Param("consentID"), c.Param("id")).Delete(consent)
c.Status(200)
}
func (srv *HISServer) UpdateSubscription(c *gin.Context) {
var payload struct {
Active bool `json:"active"`
Patient uint `json:"patient"`
}
c.BindJSON(&payload)
serviceID := c.Param("id")
service := &model.Service{}
if err := srv.data.Where("id = ?", serviceID).Preload("ServiceProvider").Preload("ServiceProvider.AuthConfig").Find(service).Error; err != nil {
c.AbortWithError(404, err)
return
}
sub := &model.Patient{}
srv.data.Model(service).Where("id = ?", payload.Patient).Association("Subscriptions").Find(sub)
if payload.Active && sub.ID != 0 {
log.Printf("No update needed: %v %v", payload.Patient, service.ServiceID)
c.Status(200)
return
} else if !payload.Active && sub.ID == 0 {
log.Printf("No update needed: %v %v", payload.Patient, service.ServiceID)
c.Status(200)
return
}
patient := &model.Patient{}
if err := srv.data.Where("id = ?", payload.Patient).Find(patient).Error; err != nil {
c.AbortWithError(404, err)
return
}
if srv.applyPolicy(payload.Active, service) {
resp, err := srv.subscribePatients(service.ServiceProvider, service, []model.Patient{*patient})
if err != nil {
log.Println("err: %v", err)
c.AbortWithError(400, err)
return
}
for _, result := range resp.Results {
if result.Error != nil {
log.Printf("Error for subscriptions")
c.AbortWithStatusJSON(http.StatusBadRequest, map[string]string{"error": result.Error.Message})
return
}
}
} else {
resp, err := srv.unsubscribePatients(service.ServiceProvider, service, []model.Patient{*patient})
if err != nil {
log.Println("err: %v", err)
c.AbortWithError(400, err)
return
}
for _, result := range resp.Results {
if result.Error != nil {
log.Printf("Error for subscriptions")
c.AbortWithStatusJSON(http.StatusBadRequest, map[string]string{"error": result.Error.Message})
return
}
}
}
if payload.Active {
srv.data.Model(service).Association("Subscriptions").Append(patient)
} else {
srv.data.Model(service).Association("Subscriptions").Delete(patient)
}
c.Status(201)
}
func (srv *HISServer) NewServiceProvider(c *gin.Context) {
var payload struct {
URL string `json:"url"`
}
c.BindJSON(&payload)
conn, err := srv.register(payload.URL)
if err != nil {
c.AbortWithError(400, err)
return
}
c.JSON(201, conn)
}
func (srv *HISServer) ActivateServiceProvider(c *gin.Context) {
connID := c.Param("id")
connection := &model.ServiceProvider{}
if err := srv.data.Preload("AuthConfig").Where("id = ?", connID).First(connection).Error; err != nil {
c.AbortWithError(404, err)
return
}
var payload struct {
PSK string `json:"psk"`
}
c.BindJSON(&payload)
conn, err := srv.activate(connection, payload.PSK)
if err != nil {
c.AbortWithError(400, err)
return
}
c.JSON(201, conn)
}
func (srv *HISServer) GetPatient(c *gin.Context) {
id := c.Param("id")
patient := &model.Patient{}
if err := srv.data.Where("patient_id = ?", id).First(patient).Error; err == nil {
serviceAny, _ := c.Get("authenticatedService")
service, _ := serviceAny.(*model.Service)
subscribed := srv.data.Raw(
"select * from service_patients where service_id = ? and patient_id = ?",
service.ID,
patient.ID,
).Scan(&map[string]interface{}{}).RowsAffected > 0
if !srv.applyPolicy(subscribed, service) {
c.Status(http.StatusUnauthorized)
return
}
f, err := os.Open(path.Join("./patients", patient.FileBase+".edi"))
if err != nil {
c.Error(err)
return
}
io.Copy(c.Writer, f)
return
}
c.JSON(404, nil)
}
func (srv *HISServer) GetFHIRPatient(c *gin.Context) {
id := c.Query("id")
patient := &model.Patient{}
if err := srv.data.Where("patient_id = ?", id).First(patient).Error; err == nil {
serviceAny, _ := c.Get("authenticatedService")
service, _ := serviceAny.(*model.Service)
subscribed := srv.data.Raw(
"select * from service_patients where service_id = ? and patient_id = ?",
service.ID,
patient.ID,
).Scan(&map[string]interface{}{}).RowsAffected > 0
if !srv.applyPolicy(subscribed, service) {
c.Status(http.StatusUnauthorized)
return
}
f, err := os.Open(path.Join("./patients", patient.FileBase+".fhir.json"))
if err != nil {
c.Error(err)
return
}
io.Copy(c.Writer, f)
return
}
c.JSON(404, nil)
}
func NewServer(addr string) *HISServer {
cert := loadCert()
srv := &HISServer{
srv: &http.Server{
Addr: addr,
Handler: gin.Default(),
TLSConfig: &tls.Config{
ClientAuth: tls.RequestClientCert,
GetCertificate: func(*tls.ClientHelloInfo) (*tls.Certificate, error) {
return cert, nil
},
},
},
clientCert: *cert,
}
return srv
}