#!/usr/bin/env python
from __future__ import print_function
"""
Author: Pierre SCHNIZER
Date: 27. January 2014
"""
import exceptions
import sys
import socket
import time
import datetime
import re
_my_now = datetime.datetime.now
_status_match = re.compile('')
_answer_match = re.compile('')
class FMTClientError(exceptions.Exception):
"""
All errors signaled by the FMT Client shall be derived from this one
"""
class ServerStatusError(FMTClientError):
"""
The fmt server returned a status error.
"""
class ProtocolError(FMTClientError):
"""
The Protocol was not kept
"""
class ServerResponseTooLong(ProtocolError):
"""
The Server responded with a message which was too long
so it did not match the protocoll
"""
class MeasureTime:
"""
Utility for measuring some duration.
"""
def __init__(self):
self._tic = None
self._tac = None
def Tic(self):
self._tic = None
self._tac = None
self._tic = _my_now()
def Tac(self):
self._tac = _my_now()
def UsedTime(self):
assert(self._tic != None)
assert(self._tac != None)
diff = self._tac - self._tic
return diff
class FMTClient:
"""
A client for the fmt server.
"""
def __init__(self, host_name = None, port_number = None, timeout = None):
if host_name == None:
host_name = "x86-14.gsi.de"
if port_number == None:
port_number = 1234
if timeout == None:
timeout = .1
port_number = int(port_number)
host_name = str(host_name)
timeout = float(timeout)
self._timeout = timeout
self._host_name = host_name
self._port_number = port_number
self._ip_address = None
self._socket = None
def Close(self):
if self._socket != None:
#print "Closing Port!"
self._socket.close()
self._socket = None
def __del__(self):
self.Close()
def GetIPAddress(self):
self._ip_address = socket.gethostbyname(self._host_name)
def _OpenPort(self):
if self._ip_address == None:
self.GetIPAddress()
assert(self._ip_address != None)
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(self._timeout)
#timeout_check = self._socket.gettimeout()
#print "Socket timeout ->%s<-" %(timeout_check,)
self._socket.connect((self._host_name, self._port_number))
def OpenPort(self):
if self._socket == None:
self._OpenPort()
assert(self._socket != None)
def _DisplayServerResponse(self, result):
print("Server Response '%s'" %(result,))
return result
def _ClientSends(self, result):
print("Client Sends '%s'" %(result,))
return result
def _ReadAnswer(self):
"""
"""
result = ""
started = 0
n_bytes_to_read = 100
for i in range(n_bytes_to_read):
a_char = None
try:
a_char = self._socket.recv(1)
finally:
if a_char == None:
print ("Line %d: result was ->%s<-" % (89, result))
if started == 0 and a_char.isspace():
continue
started = 1
result += a_char
check = result[-2:]
if check == '/>':
break
else:
msg2 = "Failed while reading %d bytes. Recieved up to now ->%s<-"
msg = msg2 % (n_bytes_to_read, result)
raise ServerResponseTooLong, msg
self._DisplayServerResponse(result)
return result
def ReadStatus(self):
"""
Waits for the status string and returns the integer value
"""
result = self._ReadAnswer()
m = _status_match.match(result)
if m == None:
print ("Status ->%s<- not parsable" %(result,))
match_groups = m.groups()
status_string = match_groups[0]
status = int(status_string, 16)
return status
def CheckStatus(self):
"""
Reads status and checks that it is zero...
"""
status = self.ReadStatus()
if status == 0:
return
msg = "status = %d" %(status)
raise ServerStatusError, status
def SendCommand(self, cmd, value):
"""
Set some value
cmd ... path to the variable
value ... the value
"""
pattern = ''
msg_string = pattern %(cmd, value)
self._ClientSends(msg_string)
self._socket.send(msg_string)
self.CheckStatus()
def GetValue(self, cmd):
"""
Get the value to be found at location cmd
"""
pattern = ''
msg_string = pattern %(cmd, )
self._ClientSends(msg_string)
self._socket.send(msg_string)
test = 0
try:
self.CheckStatus()
test = 1
finally:
if test == 0:
print("sending command '%s' failed" %(cmd,))
# Get the start string ... 20 bytes long ... which tells
# how much more to come ....
n_bytes_space = 128
for i in range(n_bytes_space):
test = 0
try:
test_char = self._socket.recv(1)
test = 1
finally:
if test == 0:
print ("Occured on iteration %d" %(n_bytes_space,))
if test_char.isspace():
continue
break
else:
raise ServerResponseTooLong, "Did not expected %d space" %(n_bytes_space,)
# First one already read
start_string = self._socket.recv(19)
start_string = test_char + start_string
m = _answer_match.match(start_string)
if m == None:
msg = "start_string ->%s<- did not match" %(start_string,)
raise ProtocolError, msg
match_groups = m.groups()
length_string = match_groups[0]
length = int(length_string, 16)
# check if according to syntax and return result
end_string = self._socket.recv(length)
m = _value_match.match(end_string)
if m == None:
msg = "end_string ->%s<- did not match!" %(end_string,)
raise ProtocolError, msg
else:
match_groups = m.groups()
value_string = match_groups[0]
self._DisplayServerResponse(start_string + end_string)
return value_string
def GetIntValue(self, cmd):
value_string = self.GetValue(cmd)
value = int(value_string, 16)
return value
def GetDoubleValue(self, cmd):
value_string = self.GetValue(cmd)
value = float(value_string)
return value
def run(**kws):
"""
Test the connection to the fmt server.
Key words
host_name ... host name on which the server runs [x86-14.gsi.de]
port_number ... port number of the server [1234]
timeout ... time to wait for timeout (in seconds) [0.1]
"""
mt = MeasureTime()
client = FMTClient(**kws)
client.GetIPAddress()
mt.Tic()
client.OpenPort()
mt.Tac()
print("Time used to open port", mt.UsedTime())
mt.Tic()
msg = None
try:
msg = client.ReadStatus()
except socket.timeout:
msg = "First status not recieved!"
mt.Tac()
print("First status: ", msg, "required %s" % (mt.UsedTime()))
mt.Tic()
server_real_time_val = client.GetIntValue("TOP:SERVER:REALTIME")
mt.Tac()
print ("Server real time value = %s (Required %s time)" % (server_real_time_val, mt.UsedTime()))
#time.sleep(5)
var_name = "TOP:PC:CURRENT:POSITIVE_LIMIT"
val = client.GetDoubleValue(var_name)
#print ("Server '%s' = %s " % (var_name, val,))
client.SendCommand(var_name, 20e3)
val = client.GetDoubleValue(var_name)
#print ("Server '%s' = %s " % (var_name, val,))
var_name = "TOP:PC:CURRENT:NEGATIVE_LIMIT"
client.SendCommand(var_name, -10)
val = client.GetDoubleValue(var_name)
#print ("Server '%s' = %s " % (var_name, val,))
var_name = "TOP:PC:RAMP_RATE_UP"
client.SendCommand(var_name, 27e3)
val = client.GetDoubleValue(var_name)
#print ("Server '%s' = %s " % (var_name, val,))
var_name = "TOP:PC:RAMP_RATE_DOWN"
client.SendCommand(var_name, -27e3)
val = client.GetDoubleValue(var_name)
#print ("Server '%s' = %s " % (var_name, val,))
var_name = "TOP:PC:RAMP_DATA:SIZE"
client.SendCommand(var_name, 2)
val = client.GetIntValue(var_name)
#print ("Server '%s' = %s " % (var_name, val,))
var_name = "TOP:PC:RAMP_DATA:INDEX"
client.SendCommand(var_name, 0)
val = client.GetIntValue(var_name)
#print ("Server '%s' = %s " % (var_name, val,))
var_name = "TOP:PC:RAMP_DATA:DELAY"
client.SendCommand(var_name, 1)
val = client.GetDoubleValue(var_name)
#print ("Server '%s' = %s " % (var_name, val,))
var_name = "TOP:PC:RAMP_DATA:NEXT_CURRENT"
client.SendCommand(var_name, 100)
var_name = "TOP:PC:RAMP_DATA:DELAY"
client.SendCommand(var_name, 2)
val = client.GetDoubleValue(var_name)
#print ("Server '%s' = %s " % (var_name, val,))
var_name = "TOP:PC:RAMP_DATA:NEXT_CURRENT"
client.SendCommand(var_name, 2000)
for idx in range(2):
var_name = "TOP:PC:RAMP_DATA:INDEX"
client.SendCommand(var_name, idx)
var_name = "TOP:PC:RAMP_DATA:DELAY"
val = client.GetDoubleValue(var_name)
#print ("Server '%s' = %s " % (var_name, val,))
var_name = "TOP:PC:RAMP_DATA:CURRENT"
val = client.GetDoubleValue(var_name)
#print ("Server '%s' = %s " % (var_name, val,))
var_name = "TOP:PC:LOAD:INDUCTANCE"
client.SendCommand(var_name, 0.55e-3)
var_name = "TOP:PC:LOAD:RESISTANCE"
client.SendCommand(var_name, 155e-6)
var_name = "TOP:PC:LOAD:MAXIMUM_CURRENT"
client.SendCommand(var_name, 17e3)
var_name = "TOP:PC:LOAD:NOMINAL_CURRENT"
client.SendCommand(var_name, 13.1e3)
var_name = "TOP:PC:LOAD:THRESHOLD_CURRENT"
client.SendCommand(var_name, 10e3)
var_name = "TOP:PC:LOAD:INDUCTANCE_CORRECTION:LINEAR"
client.SendCommand(var_name, 0.0)
var_name = "TOP:PC:LOAD:INDUCTANCE_CORRECTION:QUADRATIC"
client.SendCommand(var_name, -0.296)
var_name = "TOP:PC:LOAD:INDUCTANCE_CORRECTION:CUBIC"
client.SendCommand(var_name, -0.077)
# The commands below should be only executed on a real power converter if
# you know what you are doing
## var_name = "TOP:PC:CURRENT_EPS_ABSOLUTE"
## client.GetDoubleValue(var_name)
## var_name = "TOP:PC:VOLTAGE_EPS_ABSOLUTE"
## client.GetDoubleValue(var_name)
##
## var_name = "TOP:PC:CURRENT_RAMP_EPS_ABSOLUTE"
## client.GetDoubleValue(var_name)
##
## var_name = "TOP:PC:CURRENT_RAMP_EPS_RELATIVE"
## client.GetDoubleValue(var_name)
##
## var_name = "TOP:PC:VOLTAGE_RAMP_EPS_ABSOLUTE"
## client.GetDoubleValue(var_name)
##
## var_name = "TOP:PC:VOLTAGE_RAMP_EPS_RELATIVE"
## client.GetDoubleValue(var_name)
##
## var_name = "TOP:PC:CURRENT:VALUE"
## client.GetDoubleValue(var_name)
##
## var_name = "TOP:PC:CURRENT:GAIN"
## client.SendCommand(var_name, 2.2e3)
##
## var_name = "TOP:PC:CURRENT:POSITIVE_LIMIT"
## client.SendCommand(var_name, 18e3)
## client.GetDoubleValue(var_name)
##
## var_name = "TOP:PC:CURRENT:NEGATIVE_LIMIT"
## client.SendCommand(var_name, -100)
## client.GetDoubleValue(var_name)
##
## var_name = "TOP:PC:CURRENT:RAMP_RATE_POSITIVE_LIMIT"
## client.SendCommand(var_name, 30e3)
##
## var_name = "TOP:PC:CURRENT:RAMP_RATE_NEGATIVE_LIMIT"
## client.SendCommand(var_name, -30e3)
##
## var_name = "TOP:PC:VOLTAGE:VALUE"
## client.GetDoubleValue(var_name)
##
## var_name = "TOP:PC:VOLTAGE:GAIN"
## client.SendCommand(var_name, 25)
##
##
## var_name = "TOP:PC:VOLTAGE:POSITIVE_LIMIT"
## client.SendCommand(var_name, 20)
## client.GetDoubleValue(var_name)
##
## var_name = "TOP:PC:VOLTAGE:NEGATIVE_LIMIT"
## client.SendCommand(var_name, -20)
## client.GetDoubleValue(var_name)
##
## var_name = "TOP:PC:VOLTAGE:RAMP_RATE_POSITIVE_LIMIT"
## client.SendCommand(var_name, 3000)
##
## var_name = "TOP:PC:VOLTAGE:RAMP_RATE_NEGATIVE_LIMIT"
## client.SendCommand(var_name, -3000)
client.Close()
if __name__ == '__main__':
#run(host_name = "asl731.acc.gsi.de")
#run(host_name = "scuxl0063.acc.gsi.de")
run(host_name = "scuxl0117.acc.gsi.de", port_number=1235)