Untitled
unknown
plain_text
3 years ago
28 kB
11
Indexable
#!/usr/bin/env python3
# Copyright 2019 SMILE SA
# Author: Jean LE QUELLEC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Can Specific python libs
import can
import cantools
import isotp
# Common python libs
import time
import datetime as dt
import binascii
import os
# Libs created for this project
from averagetime import AverageTime
class Curf:
def __init__(self):
""" The class to handle CURF
You have to instanciate CAN BUS with set_can() method
"""
self.is_set = False
self.is_isotp = False
self.statusOfDTCbits = {"0": "testFailed",
"1": "testFailedThisMonitoringCycle",
"2": "pendingDTC",
"3": "confirmedDTC",
"4": "testNotCompletedSinceLastClear",
"5": "testFailedSinceLastClear",
"6": "testNotCompletedThisMonitoringCycle",
"7": "warningIndicatorRequested"}
def waiting(self, wait_time=1.0):
""" Wait the given time
Keyword argument:
wait_time -- the time to wait in second (default 1.0)
"""
time.sleep(float(wait_time))
def remove_char_from(self, nb_of_bits, payload):
""" Remove the first n char from a payload and return result
Keyword arguments:
nb_of_bits -- number of char to remove
payload -- payload to remove char
"""
return payload[int(nb_of_bits):]
def length_must_be(self, nb_of_bytes, payload):
""" Evaluate the lenght of a payload
Keyword arguments:
nb_of_bytes -- Wanted size in bytes
payload -- Payload to check
"""
if ((len(payload)//2) == int(nb_of_bytes)):
pass
else:
raise AssertionError("Bad Length! Wanted Length for Payload: " +
payload+" Is: "+nb_of_bytes +
" But Was: "+str((len(payload)//2)))
def curf_error_handler(self, error):
raise AssertionError('Curf error happened : %s - %s' %
(error.__class__.__name__, str(error)))
def set_can(self, interface, channel, bitrate, db=None, test_name=None):
""" Set the CAN BUS
Keyword arguments:
interface -- can interface (socketcan, vector, ...)
channel -- can channel (can0, vcan0, ...)
bitrate -- can bitrate (125000, 500000, ...)
db -- can database (arxml,dbc,kcd,sym,cdd)
test_name -- Name of test case
See https://cantools.readthedocs.io/en/latest/#about
See https://python-can.readthedocs.io/en/master/interfaces.html
"""
dt_now = dt.datetime.now()
self.interface = interface
self.channel = channel
self.bitrate = bitrate
self.db_file = db
self.bus = can.interface.Bus(
bustype=self.interface, channel=self.channel, bitrate=self.bitrate)
self.logbus = can.ThreadSafeBus(
interface=self.interface, channel=self.channel)
if db is not None and db != 'None':
self.db = cantools.database.load_file(db)
self.db_default_node = self.db.nodes[0].name
path = os.getcwd()
path = path + "/outputs/" + ("%d%02d%02d/" % (dt_now.year,
dt_now.month,
dt_now.day))
try:
os.mkdir(path)
except FileExistsError:
pass
output_candump_filename = path + ("%s_%d%02d%02d_%02d%02d%02d" % (test_name,
dt_now.year,
dt_now.month,
dt_now.day,
dt_now.hour,
dt_now.minute,
dt_now.second))
self.logger = can.Logger(output_candump_filename)
self.notifier = can.Notifier(self.logbus, [self.logger])
self.is_set = True
def end_can(self):
""" Stop the CAN BUS
"""
self.notifier.remove_listener(self.logger)
self.logger.stop()
def get_message_name_by_signal(self, signal_name):
""" Search message_name in Database by signal
Keyword argument:
signal_name -- The signal name for whom to search for the message
"""
is_in_db = False
for message in self.db.messages:
for signal in message.signals:
if signal_name == signal.name:
msg_name = message.name
is_in_db = True
return msg_name
if not(is_in_db):
return None
def get_next_raw_can(self):
""" Return the next received Can Frame
"""
return self.bus.recv(3)
def get_can_config(self):
""" Return the CAN configuration
"""
return self.bus.channel_info
def get_can_state(self):
""" Return the CAN bus state
"""
return self.bus.state
def set_isotp(self, source, destination,
addr_mode='Normal_29bits', test_name=None):
""" Set ISO-TP protocol
Keyword Argument:
source -- Sender address
destination -- Receiver address
addr_mode -- Adressing mode (default Normal_29bits)
"""
if addr_mode == 'Normal_29bits':
self.isotp_addr = isotp.Address(
isotp.AddressingMode.Normal_29bits,
rxid=int(destination, 16),
txid=int(source, 16))
elif addr_mode == 'Normal_11bits':
self.isotp_addr = isotp.Address(
isotp.AddressingMode.Normal_11bits,
rxid=int(destination, 16),
txid=int(source, 16))
elif addr_mode == 'Mixed_11bits':
self.isotp_addr = isotp.Address(
isotp.AddressingMode.Mixed_11bits,
rxid=int(destination, 16),
txid=int(source, 16), address_extension=0x99)
elif addr_mode == 'Mixed_29bits':
self.isotp_addr = isotp.Address(
isotp.AddressingMode.Mixed_29bits,
source_address=int(source, 16),
target_address=int(destination, 16), address_extension=0x99)
elif addr_mode == 'NormalFixed_29bits':
self.isotp_addr = isotp.Address(
isotp.AddressingMode.NormalFixed_29bits,
target_address=int(destination, 16),
source_address=int(source, 16))
elif addr_mode == 'Extended_11bits':
self.isotp_addr = isotp.Address(
isotp.AddressingMode.Extended_11bits,
rxid=int(destination, 16),
txid=int(source, 16),
source_address=0x55,
target_address=0xAA)
elif addr_mode == 'Extended_29bits':
self.isotp_addr = isotp.Address(
isotp.AddressingMode.Extended_29bits,
rxid=int(destination, 16),
txid=int(source, 16),
source_address=0x55,
target_address=0xAA)
else:
raise AssertionError(
"""Uncompatible addressing\nSee
https://can-isotp.readthedocs.io/en/latest/isotp/
examples.html#different-type-of-addresses""")
self.isotp_stack = isotp.CanStack(
bus=self.bus, address=self.isotp_addr,
error_handler=self.curf_error_handler)
self.is_isotp = True
def send_frame(self, frame_id, frame_data):
""" Send a CAN frame
Keyword arguments:
frame_id -- ID to send
frame_data -- Data to send
"""
frame_data = [frame_data[i:i+2] for i in range(0, len(frame_data), 2)]
frame_data = [int(x, 16) for x in frame_data]
frame = can.Message(arbitration_id=int(frame_id, 16), data=frame_data)
self.bus.send(frame)
def send_signal(self, signal_name, value):
""" Send a CAN signal from Database
Keyword arguments:
signal_name -- Name of the signal to send
value -- Value of the signal to send
"""
signal_dict = {signal_name: float(value)}
message_to_send = self.get_message_name_by_signal(signal_name)
message_to_send = self.db.get_message_by_name(message_to_send)
if message_to_send is not None:
for signal in message_to_send.signals:
if signal.name != signal_name and signal is not None:
signal_dict[signal.name] = 0.0
message_to_send = self.db.get_message_by_name(message_to_send.name)
data = message_to_send.encode(signal_dict)
message = can.Message(
arbitration_id=message_to_send.frame_id, data=data)
print(message)
self.bus.send(message)
else:
raise AssertionError('Signal : %s is not in Database' %
(signal_name))
# def send_signals(self,signal_dict):
def check_msg(self, msg_name, time_out,
check_not_received, node_name=None):
"""Check the reception of given message
with the given time out value
Keyword arguments:
msg_name -- message expected to be received
time_out -- timeout value in second for the reception
check_not_received -- the parameters to decide
either check reception or
check no-reception
check_not_received = True if
we want to check the no-reception
node_name -- Node ID (optional)
"""
if node_name is None or node_name == 'None':
node_name = self.db_default_node
tester = cantools.tester.Tester(
dut_name=node_name, database=self.db, can_bus=self.bus)
res = tester.expect(msg_name, signals=None, timeout=int(time_out))
print(res)
if res is not None:
if check_not_received == 'False':
pass
elif check_not_received == 'True':
raise AssertionError('Message : %s was received in:%d Seconds'
% (msg_name, int(time_out)))
else:
if check_not_received == 'False':
raise AssertionError('No Message : %s received in:%d Seconds'
% (msg_name, int(time_out)))
elif check_not_received == 'True':
pass
def check_frame(self, expect_id, expect_data, timeout, node_name=None):
"""Check the reception of give frame
with the given time out value
Keyword arguments:
expect_id -- frame expected ID to be received
expect_data -- frame expected data to be received
timeout -- timeout value in second for the reception
node_name -- Node ID (optional)
"""
expect_id = expect_id.upper()
expect_id = "0X"+expect_id
if expect_data != 'ANY':
expect_data = int(expect_data, 16)
end_time = time.time()+float(timeout)
if node_name is None or node_name == 'None':
node_name = self.db_default_node
while 1:
received_frame = self.bus.recv(float(timeout))
print(received_frame)
received_id = str(hex(received_frame.arbitration_id)).upper()
received_data = int(binascii.hexlify(received_frame.data), 16)
if received_frame is not None and received_id == expect_id:
if expect_data == "ANY":
break
elif received_data == expect_data:
break
elif expect_data == "NoReception":
raise AssertionError('Frame : %s was received with ID: %s'
% (received_frame, received_id))
else:
raise AssertionError("""Frame : %s received with good ID: %s
\nBut with Data: %s instead of: %s"""
% (received_frame, received_id,
received_data, expect_data))
elif received_frame is None and expect_data == "NoReception":
break
elif received_frame is None and expect_data != "NoReception":
raise AssertionError('No Frame was received')
elif received_frame is not None and received_id != expect_id and time.time() > end_time:
raise AssertionError("""Frame : %s received with bad ID: %s\n
With Data: %s instead of ID: %s""" %
(received_frame, received_id,
received_data, expect_id))
else:
continue
pass
def check_signal(self, signal_name, expect_value,
time_out, node_name=None):
"""Check the reception of give signal
with the given time out value
Keyword arguments:
signal_name -- signal expected name to be received
expect_value -- signal expected value to be received
time_out -- timeout value in second for the reception
node_name -- Node ID (optional)
"""
if node_name is None or node_name == 'None':
node_name = self.db_default_node
tester = cantools.tester.Tester(
dut_name=node_name, database=self.db, can_bus=self.bus)
message_to_send = self.get_message_name_by_signal(signal_name)
if message_to_send is not None:
res = tester.expect(
message_to_send, signals=None, timeout=int(time_out))
if res is not None:
for key, value in res.items():
if key == signal_name:
if value == expect_value:
pass
elif expect_value == 'NoReception':
raise AssertionError('Signal : %s was received' %
(key))
else:
raise AssertionError(""""Signal : %s was received but
value is: %s instead of: %s"""
% (key, value, expect_value))
elif res is None and expect_value == 'NoReception':
pass
else:
raise AssertionError("""Signal : %s was not received
in timeout: %f""" %
(signal_name, float(time_out)))
else:
raise AssertionError('Signal : %s was not in database' %
(signal_name))
def check_period(self, id_frame, expect_period, times):
"""Check the periodicity of given frame ID
Keyword arguments:
expect_id -- frame expected ID to be received
expect_period -- expected period in second
times -- number of measured frame
"""
timeOut = float(expect_period)*int(times)+1
end_time = time.time()+float(timeOut)
self.avrTime = AverageTime()
self.avrTime.clean_list()
count = 0
while time.time() < end_time:
while (count < int(times)):
if(time.time() >= end_time):
break
received_frame = self.bus.recv(float(expect_period)+1)
received_id = str(hex(received_frame.arbitration_id)).upper()
expect_id = id_frame.upper()
if received_id == expect_id:
self.avrTime.put_tick()
count += 1
up_bound = float(expect_period)*1.1
down_bound = float(expect_period)*0.9
if(count > 0):
period = self.avrTime.get_average()
else:
raise RuntimeError("No Message with ID:%s Received" % (id_frame))
if not (period < up_bound and period > down_bound):
raise RuntimeError("""The period is wrong, expected period is %f
but the real period is %f"""
% (float(expect_period), float(period)))
pass
def send_periodic_message(self, message_to_send, period, data=None):
"""Send a message with the given periodicity
Keyword arguments:
message_to_send -- name of the message
(relative to databse) to send
period -- periodicity in second
data -- data to send
"""
if data is None or data == 'None':
data = '0'
is_in_db = False
for message in self.db.messages:
if message.name == message_to_send:
messagets = message
is_in_db = True
if is_in_db:
msg = can.Message(
arbitration_id=messagets.frame_id, data=bytearray([int(data)]))
print(msg)
self.bus.send_periodic(msg, float(period))
else:
raise AssertionError('Message : %s is not in Database' %
(message_to_send))
def send_periodic_signal(self, signal_name, signal_value, period):
"""Send a signal with the given periodicity
Keyword arguments:
signal_name -- name of the signal
(relative to databse) to send
signal_value -- signal value to send
period -- periodicity in second
"""
is_in_db = False
signal_dict = {signal_name: float(signal_value)}
for message in self.db.messages:
for signal in message.signals:
if signal_name == signal.name:
message_to_send = message
is_in_db = True
if is_in_db:
for signal in message_to_send.signals:
if signal.name != signal_name and signal is not None:
signal_dict[signal.name] = 0.0
message_to_send = self.db.get_message_by_name(message_to_send.name)
data = message_to_send.encode(signal_dict)
msg = can.Message(
arbitration_id=message_to_send.frame_id, data=data)
print(msg)
self.bus.send_periodic(msg, float(period))
else:
raise AssertionError('Signal : %s is not in Database' %
(signal_name))
def stop_periodic_message(self):
"""Stop the sending of all periodic messages, signals and frames"""
self.bus.stop_all_periodic_tasks()
def stop_bus(self):
"""Stop the CAN BUS"""
self.bus.shutdown()
def flush_bus(self):
"""Flush the TX buffer"""
self.bus.flush_tx_buffer()
def send_diagnostic_request(self, data_to_send, address_type='Physical'):
"""Send a diagnostic (ISO-TP) request
set_isotp() must be already instanciate
Keyword arguments:
data_to_send -- The payload to send
address_type -- Addressing type (default Physical)
"""
data = bytes.fromhex(data_to_send)
print(data)
print(address_type)
if address_type == 'Functional':
self.isotp_stack.send(
data, isotp.TargetAddressType.Functional)
else:
self.isotp_stack.send(
data)
while self.isotp_stack.transmitting():
self.isotp_stack.process()
time.sleep(self.isotp_stack.sleep_time())
def check_diag_request(self, expect_reponse_data,
timeout_value, exact_or_contain):
"""Check the reception of diagnostic (ISO-TP) response
set_isotp() must be already instanciate
Keyword arguments:
expect_reponse_data -- The wanted payload to check
timeout_value -- timeout value in second for the reception
exact_or_contain -- keyword to check pieces of response
"""
res = "NoReception"
end_time = time.time() + float(timeout_value)
while (time.time() < end_time):
recv_data = None
self.isotp_stack.process()
if self.isotp_stack.available():
recv_data = self.isotp_stack.recv()
if(recv_data is None):
continue
recv_data = recv_data.hex()
print(recv_data)
if(recv_data[0:2] == "7f"):
if(recv_data[4:6] == "78"):
print("7F XX 78")
end_time = time.time() + float(timeout_value)
continue
else:
pass
recv_data = recv_data.upper()
if (recv_data is not None):
if(expect_reponse_data == "ANY"):
res = "Good Reponse Received"
break
if (expect_reponse_data != "NoReception"):
if (exact_or_contain == "EXACT"):
if recv_data == expect_reponse_data:
res = "Good Reponse Received"
break
else:
res = "Bad Reponse Received"
break
elif (exact_or_contain == "CONTAIN"):
if(recv_data.find(expect_reponse_data) >= 0):
res = "Good Reponse Received"
break
else:
res = "Bad Reponse Received"
break
elif (exact_or_contain == "START"):
if(recv_data.find(expect_reponse_data) == 0):
res = "Good Reponse Received"
break
else:
res = "Bad Reponse Received"
break
elif (exact_or_contain == "NOTSTART"):
if(recv_data.find(expect_reponse_data) == 0):
res = "Bad Reponse Received"
break
else:
res = "Good Reponse Received"
break
else:
raise AssertionError("BAD ARGUMENTS")
else:
res = "Bad Reponse Received"
break
else:
if expect_reponse_data == "NoReception":
res = "Good Reponse Received"
break
else:
res = "Bad Reponse Received"
break
if(res == "Good Reponse Received"):
pass
# Verify the result
if res == "Bad Reponse Received":
raise AssertionError(("The diagnostic reponse "
"expect to be %s but was %s.")
% (expect_reponse_data,
str(recv_data)))
if res == "NoReception" and expect_reponse_data != "NoReception":
raise AssertionError("Error CAN TimeOut Reached")
def get_next_isotp_frame(self, timeout='3'):
"""Return the next diagnostic (ISO-TP) frame
set_isotp() must be already instanciate
Keyword arguments:
timeout -- timeout value in second for the reception
"""
end_time = time.time() + float(timeout)
while (time.time() < end_time):
recv_data = None
self.isotp_stack.process()
if self.isotp_stack.available():
recv_data = self.isotp_stack.recv()
if(recv_data is None):
continue
try:
recv_data = recv_data.hex()
except:
recv_data = recv_data
print(recv_data)
if(recv_data[0:2] == "7f"):
if(recv_data[4:6] == "78"):
print("7F XX 78")
end_time = time.time() + float(timeout)
continue
else:
pass
else:
pass
if recv_data is not None:
return recv_data
if recv_data is None and time.time() >= end_time:
raise AssertionError("No ISO-TP Frame received in Timeout")
def check_snapshot(self, DtcSnapshot, DtcMaskRecord, DidValue):
"""Check the DTCmaskRecord and his value for a given snapshot
Keyword arguments:
DtcSnapshot -- The snapshot to check
DtcMaskRecord -- Wanted DtcMaskRecord
DidValue -- Wanted DID value
"""
DtcSnapshot = DtcSnapshot.upper()
if(DtcSnapshot.find(DtcMaskRecord + DidValue) > 0):
pass
else:
raise AssertionError(DtcMaskRecord + DidValue +
" Not Found in " + DtcSnapshot)
def check_statusofdtc(self, bit_name, bit_value, snapshot):
"""Check the status of DTC by bit and his value for a given snapshot
Keyword arguments:
bit_name -- Name of the statusOfDTC bit (example: testFailed)
bit_value -- Wanted bit value
snapshot -- The snapshot to check
"""
wanted_bit = 2
if snapshot[0:2] != "59":
raise AssertionError("SnapShot: " + snapshot +
" does not contain DTC\n")
snapshot = '{:032b}'.format(int(snapshot, 16))
statusOfDTC = snapshot[39:47]
print(statusOfDTC)
for key, value in self.statusOfDTCbits.items():
if(bit_name == value):
wanted_bit = statusOfDTC[7-int(key)]
print(wanted_bit)
print(key)
print(value)
if(int(wanted_bit) < 2):
if(wanted_bit == bit_value):
pass
else:
raise AssertionError(
"Wanted "+bit_name+" Value is: "+bit_value +
" But was: " + wanted_bit)
else:
raise AssertionError("Bit Name Must Be One of Those:\n" +
' \n'.join(str(self.statusOfDTCbits[x])
for x in
sorted(self.statusOfDTCbits)) +
"\nBut Was:\n"+bit_name)
def get_seedkey(self, Seed, Constant_1, Constant_2):
"""Return a Key generated by a given key and 2 constants
This is a "dumb" example only adding constant to seed.
You must implement your own security handshake.
Keyword arguments:
Seed -- The seed to generate key from
Constant_1 -- The first constant
Constant_2 -- The second constant
"""
Constant_1 = int(Constant_1, 16)
Constant_2 = int(Constant_2, 16)
Seed = int(Seed, 16)
result_key = Seed + Constant_1 + Constant_2
result_key = (result_key).to_bytes(4, byteorder='big')
return result_key.hex()Editor is loading...