bloodlabnet

package module
v0.6.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 28, 2025 License: Apache-2.0 Imports: 18 Imported by: 0

README

Go BloodLab Net

A libarary to simplify communication with laboratory instruments.

Install

go get github.com/blutspende/go-bnet

Features
  • TCP/IP Server implementation
  • TCP/IP Client implementation
  • FTP Client implementation
  • Low-level protocols :
    • RAW protocol.Raw()
    • STX-ETX protocol.STXETX()
    • MLLP (for HL7) protocol.MLLP()
    • Lis1A1 protocol.Lis1A1()
TCP/IP Client

Create a client to send data.

 tcpClient := CreateNewTCPClient("127.0.0.1", 4001, protocol.Raw(), NoLoadBalancer)

 if err := tcpClient.Connect(); err != nil {  
   log.Panic(err)
 }

 n, err := tcpClient.Send([]byte("Hello TCP/IP"))

 receivedMsg, err := tcpClient.Receive()
TCP/IP Server
package main

import (
	"fmt"
	"time"

	bloodlabnet "github.com/blutspende/go-bnet"
	"github.com/blutspende/go-bnet/protocol"
)

type MySessionHandler struct {
}

func (s *MySessionHandler) Connected(session bloodlabnet.Session) {
	fmt.Println("Disconnected Event")
}

func (s *MySessionHandler) Disconnected(session bloodlabnet.Session) {
	fmt.Println("Disconnected Event")
}

func (s *MySessionHandler) Error(session bloodlabnet.Session, errorType bloodlabnet.ErrorType, err error) {
	fmt.Println(err)
}

func (s *MySessionHandler) DataReceived(session bloodlabnet.Session, data []byte, receiveTimestamp time.Time) {
	
  rad, _ := session.RemoteAddress()
	fmt.Println("From %s received : ", rad, string(data))
	
  session.Send([]byte(fmt.Sprintf("You are sending from %s", rad)))
}

func main() {

	server := bloodlabnet.CreateNewTCPServerInstance(4009,
		protocol.STXETX(),
		bloodlabnet.HAProxySendProxyV2,
		100) // Max Connections

	server.Run(&MySessionHandler{})
}

Protocols

Raw Protocol (TCP/Client + TCP/Server)

Raw communication for tcp-ip.

STX-ETX Protocol (TCP/Client + TCP/Server)

Mesasge are embedded in (Ascii 0x02) and (Ascii 0x03) to indicate start and end. At the end of each transmission the transmissions contents are passed further for higher level protocols.

 .... <STX>Some data<ETX> This data here is ignored <STX>More data<ETX> ....
MLLP Protocol (TCP/Client + TCP/Server)

Mesasge are embedded in (Ascii 11) and (Ascii 28) terminated with (Ascii 13) to indicate start and end. At the end of each transmission the transmissions contents are passed further for higher level protocols.

 .... <VT>Some data<FS><CR> This data here is ignored <VT>More data<FS><CR> ....

For some vendors start/stop - bytes might be altered. In that case you can use DefaultMLLPProtocolSettings and use SetStartByte and SetStopByte to change them.

tcpServer := bloodlabnet.CreateNewTCPServerInstance(config.TCPListenerPort,
  bloodlabnetProtocol.MLLP(bloodlabnetProtocol.DefaultMLLPProtocolSettings().SetStartByte(0)),
  bloodlabnet.HAProxySendProxyV2, config.TCPServerMaxConnections)
Lis1A1 Protocol (TCP/Client + TCP/Server)

Lis1A1 is a low level protocol for submitting data to laboratory instruments, typically via serial line. Settings for Lis1A1 low level protocol (multiple settings can be chained):

// enables/disables verifying the checksum of received messages
EnableStrictChecksum() | DisableStrictChecksum() 

// enables/disables adding the frame number to the start of the sent message frame
EnableFrameNumber() | DisableFrameNumber()

// enables/disables verifying the frame number of received messages
EnableStrictFrameOrder() | DisableStrictFrameOrder()

// enables/disables sending <CR><ETX> as frame end, by default frame end is <ETX>
EnableAppendCarriageReturnToFrameEnd() | DisableAppendCarriageReturnToFrameEnd()

// sets line ending, by default line end is <CR><LF>
SetLineEnding(lineEnding []byte)
au6xx Protocol (TCP/Client + TCP/Server)

The au6xx is the low-level protocol required for connecting to Beckman&Coulter AU6xx systems.

TCP/IP Server Configuration

Connection timeout

Portscanners or Loadbalancers do connect just to disconnect a second later. By default the session initiation happens not on connect, rather when the first byte is sent.

Notice that certain instruments require the connection to remain opened, even if there is no data.

Disable the connection-timeout
config := DefaultTCPServerSettings
config.SessionAfterFirstByte = false // Default: true
Require the first Byte to be sent within a timelimit
config := DefaultTCPServerSettings
config.SessionInitationTimeout = time.Second * 3  // Default: 0
Configure blacklist
tcpServerSettings := bnet.DefaultTCPServerSettings
	if len(config.BlackListedTCPClientIPAddresses) > 0 {
		tcpServerSettings.BlackListedIPAddresses = strings.Split(config.BlackListedTCPClientIPAddresses, ",")
	}

Add low-level Logging : Protcol-Logger

Logging can be added to any protocol by wrapping the Protocol into the logger. This does not affect the functionality. In addition set the environment-variable PROTOLOG_ENABLE to true.

PROTOLOG_ENABLE = (traffic|extended)

  1. traffic = logs only the
  2. extended = would log the traffic plus the states of the FSM
set PROTOLOG_ENABLE=extended
tcpServer := bloodlabnet.CreateNewTCPServerInstance(config.TCPListenerPort,
  bloodlabnetProtocol.Logger(
    bloodlabnetProtocol.MLLP(bloodlabnetProtocol.DefaultMLLPProtocolSettings().SetStartByte(0))
  ),
  bloodlabnet.HAProxySendProxyV2, config.TCPServerMaxConnections)

FTP Client

The FTP Client connects to an FTP Server and polls in intervals for new Data. New files are treated as Incoming transmissions, as if they were submitted through a socket. When initializing a strategy must be chosen to deal with already processed files : Move them to a save folder or delete them.

Data send is dropped as file onto the server. Their filenames are generated with a generator-function passed during initialization.

If the client looses the connection to the server during a timeout, the connection is reestablished and normal operations continues.

// initialize the FTP Client
bnetFtpClient := CreateNewFTPClient("127.0.0.1", 21, "test", "test", TESTDIR, "*.dat",
	"out", ".out",DefaultFTPFilnameGenerator, PROCESS_STRATEGY_MOVE2SAVE, "\n")

// Start polling. Note that the RUN function exists on serious errors
go func() {
	err := bnetFtpClient.Run(th)
	log.Error(err)
	os.Exit(-1
}()

Documentation

Index

Constants

This section is empty.

Variables

View Source
var DefaultTCPClientSettings = TCPClientConfiguration{
	Timeout:                 time.Second * 3,
	Deadline:                time.Millisecond * 200,
	FlushBufferTimoutMs:     500,
	PollInterval:            time.Second * 60,
	SessionAfterFirstByte:   true,
	SessionInitationTimeout: time.Second * 0,
	SourceIP:                "",
}
View Source
var DefaultTCPServerSettings = TCPServerConfiguration{
	Timeout:                  time.Second * 3,
	Deadline:                 time.Millisecond * 200,
	FlushBufferTimoutMs:      500,
	PollInterval:             time.Second * 60,
	SessionAfterFirstByte:    true,
	SessionInitiationTimeout: time.Second * 30,
}
View Source
var ErrConnectionFailed = fmt.Errorf("connction error")
View Source
var ErrDeleteFile = fmt.Errorf("failed to delete file")
View Source
var ErrDownloadFileFailed = fmt.Errorf("donwload files failed")
View Source
var ErrExited = fmt.Errorf("server exited")
View Source
var ErrFtpLoginFailed = fmt.Errorf("FTP Login Failed")
View Source
var ErrListFilesFailed = fmt.Errorf("failed to access files for listing")
View Source
var ErrSendFileFailed = fmt.Errorf("sending file failed")

Functions

func CreateNewFTPClient

func CreateNewFTPClient(hostname string, hostport int,
	username string, password string,
	inputFilePath, inputFilePattern,
	outputFilePath string,
	outputFileExtension string,
	filenameGeneratorFunction FTPFilenameGeneratorFunction,
	processStrategy ProcessStrategy,
	lineBreak string,
	pollInterval time.Duration,
) *ftpConnectionAndSession

10-180 Seconds is good for a pollInterval depending on the server

func DefaultFTPFilnameGenerator

func DefaultFTPFilnameGenerator(data []byte, fileExtension string) (string, error)

Default generator function

Types

type BufferedConn

type BufferedConn struct {
	net.Conn // So that most methods are embedded
	// contains filtered or unexported fields
}

-------------------------------------------------------------------------------------------- BufferedConn wrapping the net.Conn yet compatible for better reading performance and a peek preview. --------------------------------------------------------------------------------------------

func (BufferedConn) FirstByteOrError

func (b BufferedConn) FirstByteOrError(howLong time.Duration) error

func (BufferedConn) Peek

func (b BufferedConn) Peek(n int) ([]byte, error)

func (BufferedConn) Read

func (b BufferedConn) Read(p []byte) (int, error)

type ConnectionAndSessionInstance

type ConnectionAndSessionInstance interface {
	Connect() error
	ConnectionInstance
	Session
}

func CreateNewTCPClient

func CreateNewTCPClient(hostname string, port int,
	lowLevelProtocol protocol.Implementation,
	proxy ConnectionType, timing ...TCPClientConfiguration) ConnectionAndSessionInstance

type ConnectionInstance

type ConnectionInstance interface {
	// Send  data directly from instance. This will not work if the instance
	Send(data [][]byte) (int, error)
	//Receive data directly from instance. This will not work if the instance handles many connections like TCP-Servers
	Receive() ([]byte, error)
	// Run - Main-Loop. Blocking.&
	Run(handler Handler) error
	// Stop the main-loop of the Run-handler
	Stop()
	// Retrieve a session by IP. Do not use this for a normal protocol conversion of a server... Can return nil
	FindSessionsByIp(ip string) []Session
	// Wait until the server is ready for connections. Useful at startup. Returns true when the server is ready
	WaitReady() bool
}

func CreateNewTCPServerInstance

func CreateNewTCPServerInstance(listeningPort int, protocolReceiveve protocol.Implementation,
	connectionType ConnectionType, maxConnections int, timingConfigs ...TCPServerConfiguration) ConnectionInstance

type ConnectionType

type ConnectionType int
const (
	NoLoadBalancer     ConnectionType = 1
	HAProxySendProxyV2 ConnectionType = 2
)

type ErrorType

type ErrorType int
const (
	ErrorConnect         ErrorType = 1
	ErrorSend            ErrorType = 2
	ErrorReceive         ErrorType = 3
	ErrorDisconnect      ErrorType = 4
	ErrorInternal        ErrorType = 5
	ErrorConnectionLimit ErrorType = 6
	ErrorAccept          ErrorType = 7 // error for Server
	ErrorMaxConnections  ErrorType = 8
	ErrorCreateSession   ErrorType = 9  // server only
	ErrorConfiguration   ErrorType = 10 // Error in configuration
	ErrorLogin           ErrorType = 11
)

type FTPFilenameGeneratorFunction

type FTPFilenameGeneratorFunction func([]byte, string) (string, error)

type FileNameGeneration

type FileNameGeneration int
const (
	Default   FileNameGeneration = 1
	TimeStamp FileNameGeneration = 1
)

type Handler

type Handler interface {
	//DataReceived event is triggered whenever the underlying protocol delivered a complete block(file/transmission) of data
	DataReceived(session Session, data []byte, receiveTimestamp time.Time) error
	// Connected event is triggered when connection is established. For client as well as for servers. 	For clients in addition every time the connection had
	// to be reestablished. If this method returns anything other but null, the connections
	// is declined and the thread ends
	Connected(session Session) error
	// Disconnected event is triggered when connection
	// is terminated.
	//For Servers: when the client ends the session
	// For clients: when the only client connection ends (inc.eof)
	Disconnected(session Session)

	// Error is called from async process (Run) when
	// status messages regarding the connection is available
	Error(session Session, typeOfError ErrorType, err error)
}

type ProcessStrategy

type ProcessStrategy string
const PROCESS_STRATEGY_DELETE ProcessStrategy = "delete"

after processing delete the read file

const PROCESS_STRATEGY_DONOTHING ProcessStrategy = "donothing"

after processing do nothing with the file, even though it will get reprocessed on a restart

const PROCESS_STRATEGY_MOVE2SAVE ProcessStrategy = "move2save"

after processing move the file to save folder

type ReadFilePolicy

type ReadFilePolicy int
const (
	ReadAndLeaveFile ReadFilePolicy = 1
	DeleteWhenRead   ReadFilePolicy = 2
	RenameWhenRead   ReadFilePolicy = 3
)

type SecureConnectionOptions

type SecureConnectionOptions struct {
	PublicKey string
}

type Session

type Session interface {
	IsAlive() bool
	Send(msg [][]byte) (int, error)
	Receive() ([]byte, error)
	Close() error
	WaitTermination() error
	RemoteAddress() (string, error)
}

type TCPClientConfiguration

type TCPClientConfiguration struct {
	Timeout                 time.Duration
	Deadline                time.Duration
	FlushBufferTimoutMs     int
	PollInterval            time.Duration
	SessionAfterFirstByte   bool
	SessionInitationTimeout time.Duration
	SourceIP                string
}

func (TCPClientConfiguration) SetSourceIP

func (s TCPClientConfiguration) SetSourceIP(sourceIP string) TCPClientConfiguration

type TCPServerConfiguration

type TCPServerConfiguration struct {
	Timeout                  time.Duration
	Deadline                 time.Duration
	FlushBufferTimoutMs      int
	PollInterval             time.Duration
	SessionAfterFirstByte    bool
	SessionInitiationTimeout time.Duration
	BlackListedIPAddresses   []string
}

Directories

Path Synopsis
example

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL