232 lines
9.2 KiB
Python
232 lines
9.2 KiB
Python
# Copyright (C) 2017 The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the 'License');
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an 'AS IS' BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
#
|
|
|
|
import datetime
|
|
import time
|
|
|
|
from xml.dom import minidom
|
|
|
|
import diagnostic_sensors as s
|
|
import vhal_consts_2_0 as c
|
|
|
|
from diagnostic_builder import DiagnosticEventBuilder
|
|
|
|
# interval of generating driving information
|
|
SAMPLE_INTERVAL_SECONDS = 0.5
|
|
|
|
RPM_LOW = 1000
|
|
RPM_HIGH = 3000
|
|
|
|
REVERSE_DURATION_SECONDS = 10
|
|
PARK_DURATION_SECONDS = 10
|
|
|
|
# roughly 5 miles/hour
|
|
REVERSE_SPEED_METERS_PER_SECOND = 2.3
|
|
|
|
UTC_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
|
|
|
|
# Diagnostics property constants. The value is based on the record from a test drive
|
|
FUEL_SYSTEM_STATUS_VALUE = 2
|
|
AMBIENT_AIR_TEMPERATURE_VALUE = 21
|
|
ENGINE_COOLANT_TEMPERATURE_VALUE = 75
|
|
|
|
|
|
def speed2Gear(speed):
|
|
"""
|
|
Get the current gear based on speed of vehicle. The conversion may not be strictly real but
|
|
are close enough to a normal vehicle. Assume the vehicle is moving forward.
|
|
"""
|
|
if speed < 4.4:
|
|
# 0 - 10 mph
|
|
return c.VEHICLEGEAR_GEAR_1
|
|
elif speed < 11.2:
|
|
# 10 - 25 mph
|
|
return c.VEHICLEGEAR_GEAR_2
|
|
elif speed < 20.1:
|
|
# 25 - 45 mph
|
|
return c.VEHICLEGEAR_GEAR_3
|
|
elif speed < 26.8:
|
|
# 45 - 60 mph
|
|
return c.VEHICLEGEAR_GEAR_4
|
|
else:
|
|
# > 60 mph
|
|
return c.VEHICLEGEAR_GEAR_5
|
|
|
|
class GpxFrame(object):
|
|
"""
|
|
A class representing a track point from GPX file
|
|
"""
|
|
def __init__(self, trkptDom):
|
|
timeElements = trkptDom.getElementsByTagName('time')
|
|
if timeElements:
|
|
# time value in GPX is in UTC format: YYYY-MM-DDTHH:MM:SS, need to parse it
|
|
self.datetime = datetime.datetime.strptime(timeElements[0].firstChild.nodeValue,
|
|
UTC_TIME_FORMAT)
|
|
speedElements = trkptDom.getElementsByTagName('speed')
|
|
if speedElements:
|
|
self.speedInMps = float(speedElements[0].firstChild.nodeValue)
|
|
|
|
class DrivingInfoGenerator(object):
|
|
"""
|
|
A class that generates driving information like speed, odometer, rpm, diagnostics etc. It
|
|
takes a GPX file which describes a real route that consists of a sequence of location data,
|
|
and then derive driving information from those data.
|
|
|
|
One assumption is that it is automatic transmission car, so that current gear does not
|
|
necessarily match selected gear.
|
|
"""
|
|
|
|
def __init__(self, gpxFile, vhal):
|
|
self.gpxDom = minidom.parse(gpxFile)
|
|
# Speed of vehicle (meter / second)
|
|
self.speedInMps = 0
|
|
# Fixed RPM with average value during driving
|
|
self.rpm = RPM_LOW
|
|
# Odometer (kilometer)
|
|
self.odometerInKm = 0
|
|
# Gear selection
|
|
self.selectedGear = c.VEHICLEGEAR_GEAR_PARK
|
|
# Current gear
|
|
self.currentGear = c.VEHICLEGEAR_GEAR_PARK
|
|
# Timestamp while driving on route defined in GPX file
|
|
self.datetime = 0
|
|
# Get Diagnostics live frame property configure
|
|
vhal.getConfig(c.VEHICLEPROPERTY_OBD2_LIVE_FRAME)
|
|
self.liveFrameConfig = vhal.rxMsg()
|
|
|
|
|
|
def _generateFrame(self, listener):
|
|
"""
|
|
Handle newly generated vehicle property with listener
|
|
"""
|
|
listener.handle(c.VEHICLEPROPERTY_PERF_VEHICLE_SPEED, 0, self.speedInMps, "PERF_VEHICLE_SPEED")
|
|
listener.handle(c.VEHICLEPROPERTY_ENGINE_RPM, 0, self.rpm, "ENGINE_RPM")
|
|
listener.handle(c.VEHICLEPROPERTY_PERF_ODOMETER, 0, self.odometerInKm, "PERF_ODOMETER")
|
|
listener.handle(c.VEHICLEPROPERTY_CURRENT_GEAR, 0, self.currentGear, "CURRENT_GEAR")
|
|
listener.handle(c.VEHICLEPROPERTY_OBD2_LIVE_FRAME, 0,
|
|
self._buildDiagnosticLiveFrame(), "DIAGNOSTIC_LIVE_FRAME")
|
|
|
|
def _buildDiagnosticLiveFrame(self):
|
|
"""
|
|
Build a diagnostic live frame with a few sensor fields set
|
|
"""
|
|
builder = DiagnosticEventBuilder(self.liveFrameConfig)
|
|
builder.setStringValue('')
|
|
builder.addIntSensor(s.DIAGNOSTIC_SENSOR_INTEGER_FUEL_SYSTEM_STATUS,
|
|
FUEL_SYSTEM_STATUS_VALUE)
|
|
builder.addIntSensor(s.DIAGNOSTIC_SENSOR_INTEGER_AMBIENT_AIR_TEMPERATURE,
|
|
AMBIENT_AIR_TEMPERATURE_VALUE)
|
|
builder.addFloatSensor(s.DIAGNOSTIC_SENSOR_FLOAT_ENGINE_COOLANT_TEMPERATURE,
|
|
ENGINE_COOLANT_TEMPERATURE_VALUE)
|
|
builder.addFloatSensor(s.DIAGNOSTIC_SENSOR_FLOAT_ENGINE_RPM, self.rpm)
|
|
builder.addFloatSensor(s.DIAGNOSTIC_SENSOR_FLOAT_VEHICLE_SPEED, self.speedInMps)
|
|
return builder.build()
|
|
|
|
def _generateFromGpxFrame(self, gpxFrame, listener):
|
|
"""
|
|
Generate a sequence of vehicle property frames from current track point to the next one.
|
|
The frequency of frames are pre-defined.
|
|
|
|
Some assumptions here:
|
|
- Two track points are very close to each other (e.g. 1 second driving distance)
|
|
- It is a straight line between two track point
|
|
- Speed is changing linearly between two track point
|
|
|
|
Given the info:
|
|
timestamp1 : speed1
|
|
timestamp2 : speed2
|
|
|
|
Vehicle properties in each frame are derived like this:
|
|
- Speed is calculated based on linear model
|
|
- Odometer is calculated based on speed and time
|
|
- RPM will be set to a low value if not accelerating, otherwise set to a high value
|
|
- Current gear will be set according to speed
|
|
"""
|
|
|
|
duration = (gpxFrame.datetime - self.datetime).total_seconds()
|
|
speedIncrement = (gpxFrame.speedInMps - self.speedInMps) / duration * SAMPLE_INTERVAL_SECONDS
|
|
self.rpm = RPM_HIGH if speedIncrement > 0 else RPM_LOW
|
|
|
|
timeElapsed = 0
|
|
while timeElapsed < duration:
|
|
self._generateFrame(listener)
|
|
if timeElapsed + SAMPLE_INTERVAL_SECONDS < duration:
|
|
self.odometerInKm += (self.speedInMps + speedIncrement / 2.0) * SAMPLE_INTERVAL_SECONDS / 1000
|
|
self.speedInMps += speedIncrement
|
|
time.sleep(SAMPLE_INTERVAL_SECONDS)
|
|
else:
|
|
timeLeft = duration - timeElapsed
|
|
self.odometerInKm += (self.speedInMps + gpxFrame.speedInMps) / 2.0 * timeLeft / 1000
|
|
self.speedInMps = gpxFrame.speedInMps
|
|
time.sleep(timeLeft)
|
|
|
|
self.currentGear = speed2Gear(self.speedInMps)
|
|
timeElapsed += SAMPLE_INTERVAL_SECONDS
|
|
|
|
self.datetime = gpxFrame.datetime
|
|
|
|
def _generateInReverseMode(self, duration, listener):
|
|
print "Vehicle is reversing"
|
|
listener.handle(c.VEHICLEPROPERTY_GEAR_SELECTION, 0, c.VEHICLEGEAR_GEAR_REVERSE,
|
|
"GEAR_SELECTION")
|
|
listener.handle(c.VEHICLEPROPERTY_CURRENT_GEAR, 0, c.VEHICLEGEAR_GEAR_REVERSE,
|
|
"CURRENT_GEAR")
|
|
self.rpm = RPM_LOW
|
|
self.speedInMps = REVERSE_SPEED_METERS_PER_SECOND
|
|
curTime = 0
|
|
while curTime < duration:
|
|
self._generateFrame(listener)
|
|
self.odometerInKm += self.speedInMps * SAMPLE_INTERVAL_SECONDS / 1000
|
|
curTime += SAMPLE_INTERVAL_SECONDS
|
|
time.sleep(SAMPLE_INTERVAL_SECONDS)
|
|
# After reverse is done, set speed to 0
|
|
self.speedInMps = .0
|
|
|
|
def _generateInParkMode(self, duration, listener):
|
|
print "Vehicle is parked"
|
|
listener.handle(c.VEHICLEPROPERTY_GEAR_SELECTION, 0, c.VEHICLEGEAR_GEAR_PARK,
|
|
"GEAR_SELECTION")
|
|
listener.handle(c.VEHICLEPROPERTY_CURRENT_GEAR, 0, c.VEHICLEGEAR_GEAR_PARK,
|
|
"CURRENT_GEAR")
|
|
# Assume in park mode, engine is still on
|
|
self.rpm = RPM_LOW
|
|
self.speedInMps = .0
|
|
curTime = 0
|
|
while curTime < duration:
|
|
self._generateFrame(listener)
|
|
curTime += SAMPLE_INTERVAL_SECONDS
|
|
time.sleep(SAMPLE_INTERVAL_SECONDS)
|
|
|
|
def generate(self, listener):
|
|
# First, car is parked (probably in garage)
|
|
self._generateInParkMode(PARK_DURATION_SECONDS, listener)
|
|
# Second, car will reverse (out of garage)
|
|
self._generateInReverseMode(REVERSE_DURATION_SECONDS, listener)
|
|
|
|
trk = self.gpxDom.getElementsByTagName('trk')[0]
|
|
trkseg = trk.getElementsByTagName('trkseg')[0]
|
|
trkpts = trkseg.getElementsByTagName('trkpt')
|
|
|
|
print "Vehicle start moving forward"
|
|
listener.handle(c.VEHICLEPROPERTY_GEAR_SELECTION, 0, c.VEHICLEGEAR_GEAR_DRIVE,
|
|
"GEAR_SELECTION")
|
|
|
|
firstGpxFrame = GpxFrame(trkpts[0])
|
|
self.speedInMps = firstGpxFrame.speedInMps
|
|
self.datetime = firstGpxFrame.datetime
|
|
|
|
for i in xrange(1, len(trkpts)):
|
|
self._generateFromGpxFrame(GpxFrame(trkpts[i]), listener)
|