#!/usr/bin/env python
# -*- coding: utf-8 -*-
from wireviz.DataClasses import Connector, Cable
from graphviz import Graph
from wireviz import wv_colors, wv_helper
from wireviz.wv_colors import get_color_hex
from wireviz.wv_helper import awg_equiv, mm2_equiv, tuplelist2tsv, \
nested_html_table, flatten2d, index_if_list, html_line_breaks, \
graphviz_line_breaks, remove_line_breaks, open_file_read, open_file_write, \
html_colorbar, html_image, html_caption, manufacturer_info_field
from collections import Counter
from typing import List
from pathlib import Path
import re
class Harness:
def __init__(self):
self.color_mode = 'SHORT'
self.connectors = {}
self.cables = {}
self.additional_bom_items = []
def add_connector(self, name: str, *args, **kwargs) -> None:
self.connectors[name] = Connector(name, *args, **kwargs)
def add_cable(self, name: str, *args, **kwargs) -> None:
self.cables[name] = Cable(name, *args, **kwargs)
def add_bom_item(self, item: dict) -> None:
self.additional_bom_items.append(item)
def connect(self, from_name: str, from_pin: (int, str), via_name: str, via_pin: (int, str), to_name: str, to_pin: (int, str)) -> None:
for (name, pin) in zip([from_name, to_name], [from_pin, to_pin]): # check from and to connectors
if name is not None and name in self.connectors:
connector = self.connectors[name]
if pin in connector.pins and pin in connector.pinlabels:
if connector.pins.index(pin) == connector.pinlabels.index(pin):
# TODO: Maybe issue a warning? It's not worthy of an exception if it's unambiguous, but maybe risky?
pass
else:
raise Exception(f'{name}:{pin} is defined both in pinlabels and pins, for different pins.')
if pin in connector.pinlabels:
if connector.pinlabels.count(pin) > 1:
raise Exception(f'{name}:{pin} is defined more than once.')
else:
index = connector.pinlabels.index(pin)
pin = connector.pins[index] # map pin name to pin number
if name == from_name:
from_pin = pin
if name == to_name:
to_pin = pin
if not pin in connector.pins:
raise Exception(f'{name}:{pin} not found.')
self.cables[via_name].connect(from_name, from_pin, via_pin, to_name, to_pin)
if from_name in self.connectors:
self.connectors[from_name].activate_pin(from_pin)
if to_name in self.connectors:
self.connectors[to_name].activate_pin(to_pin)
def create_graph(self) -> Graph:
dot = Graph()
dot.body.append('// Graph generated by WireViz')
dot.body.append('// https://github.com/formatc1702/WireViz')
font = 'arial'
dot.attr('graph', rankdir='LR',
ranksep='2',
bgcolor='white',
nodesep='0.33',
fontname=font)
dot.attr('node', shape='record',
style='filled',
fillcolor='white',
fontname=font)
dot.attr('edge', style='bold',
fontname=font)
# prepare ports on connectors depending on which side they will connect
for _, cable in self.cables.items():
for connection_color in cable.connections:
if connection_color.from_port is not None: # connect to left
self.connectors[connection_color.from_name].ports_right = True
if connection_color.to_port is not None: # connect to right
self.connectors[connection_color.to_name].ports_left = True
for connector in self.connectors.values():
html = []
rows = [[connector.name if connector.show_name else None],
[f'P/N: {connector.pn}' if connector.pn else None,
html_line_breaks(manufacturer_info_field(connector.manufacturer, connector.mpn))],
[html_line_breaks(connector.type),
html_line_breaks(connector.subtype),
f'{connector.pincount}-pin' if connector.show_pincount else None,
connector.color, html_colorbar(connector.color)],
'' if connector.style != 'simple' else None,
[html_image(connector.image)],
[html_caption(connector.image)],
[html_line_breaks(connector.notes)]]
html.extend(nested_html_table(rows))
if connector.style != 'simple':
pinhtml = []
pinhtml.append('
')
for pin, pinlabel in zip(connector.pins, connector.pinlabels):
if connector.hide_disconnected_pins and not connector.visible_pins.get(pin, False):
continue
pinhtml.append(' ')
if connector.ports_left:
pinhtml.append(f' | {pin} | ')
if pinlabel:
pinhtml.append(f' {pinlabel} | ')
if connector.ports_right:
pinhtml.append(f' {pin} | ')
pinhtml.append('
')
pinhtml.append('
')
html = [row.replace('', '\n'.join(pinhtml)) for row in html]
html = '\n'.join(html)
dot.node(connector.name, label=f'<\n{html}\n>', shape='none', margin='0', style='filled', fillcolor='white')
if len(connector.loops) > 0:
dot.attr('edge', color='#000000:#ffffff:#000000')
if connector.ports_left:
loop_side = 'l'
loop_dir = 'w'
elif connector.ports_right:
loop_side = 'r'
loop_dir = 'e'
else:
raise Exception('No side for loops')
for loop in connector.loops:
dot.edge(f'{connector.name}:p{loop[0]}{loop_side}:{loop_dir}',
f'{connector.name}:p{loop[1]}{loop_side}:{loop_dir}')
# determine if there are double- or triple-colored wires in the harness;
# if so, pad single-color wires to make all wires of equal thickness
pad = any(len(colorstr) > 2 for cable in self.cables.values() for colorstr in cable.colors)
for cable in self.cables.values():
html = []
awg_fmt = ''
if cable.show_equiv:
# Only convert units we actually know about, i.e. currently
# mm2 and awg --- other units _are_ technically allowed,
# and passed through as-is.
if cable.gauge_unit =='mm\u00B2':
awg_fmt = f' ({awg_equiv(cable.gauge)} AWG)'
elif cable.gauge_unit.upper() == 'AWG':
awg_fmt = f' ({mm2_equiv(cable.gauge)} mm\u00B2)'
rows = [[cable.name if cable.show_name else None],
[f'P/N: {cable.pn}' if (cable.pn and not isinstance(cable.pn, list)) else None,
html_line_breaks(manufacturer_info_field(
cable.manufacturer if not isinstance(cable.manufacturer, list) else None,
cable.mpn if not isinstance(cable.mpn, list) else None))],
[html_line_breaks(cable.type),
f'{cable.wirecount}x' if cable.show_wirecount else None,
f'{cable.gauge} {cable.gauge_unit}{awg_fmt}' if cable.gauge else None,
'+ S' if cable.shield else None,
f'{cable.length} m' if cable.length > 0 else None,
cable.color, html_colorbar(cable.color)],
'',
[html_image(cable.image)],
[html_caption(cable.image)],
[html_line_breaks(cable.notes)]]
html.extend(nested_html_table(rows))
wirehtml = []
wirehtml.append('') # conductor table
wirehtml.append(' | |
')
for i, connection_color in enumerate(cable.colors, 1):
wirehtml.append(' ')
wirehtml.append(f' | ')
wirehtml.append(f' {wv_colors.translate_color(connection_color, self.color_mode)} | ')
wirehtml.append(f' | ')
wirehtml.append('
')
bgcolors = ['#000000'] + get_color_hex(connection_color, pad=pad) + ['#000000']
wirehtml.append(f' ')
wirehtml.append(f' ')
wirehtml.append(' ')
for j, bgcolor in enumerate(bgcolors[::-1]): # Reverse to match the curved wires when more than 2 colors
wirehtml.append(f' | ')
wirehtml.append(' ')
wirehtml.append(' | ')
wirehtml.append('
')
if(cable.category == 'bundle'): # for bundles individual wires can have part information
# create a list of wire parameters
wireidentification = []
if isinstance(cable.pn, list):
wireidentification.append(f'P/N: {cable.pn[i - 1]}')
manufacturer_info = manufacturer_info_field(
cable.manufacturer[i - 1] if isinstance(cable.manufacturer, list) else None,
cable.mpn[i - 1] if isinstance(cable.mpn, list) else None)
if manufacturer_info:
wireidentification.append(html_line_breaks(manufacturer_info))
# print parameters into a table row under the wire
if(len(wireidentification) > 0):
wirehtml.append(' ')
wirehtml.append(' ')
for attrib in wireidentification:
wirehtml.append(f' | {attrib} | ')
wirehtml.append(' ')
wirehtml.append(' |
')
if cable.shield:
wirehtml.append(' | |
') # spacer
wirehtml.append(' ')
wirehtml.append(' | ')
wirehtml.append(' Shield | ')
wirehtml.append(' | ')
wirehtml.append('
')
if isinstance(cable.shield, str):
# shield is shown with specified color and black borders
shield_color_hex = wv_colors.get_color_hex(cable.shield)[0]
attributes = f'height="6" bgcolor="{shield_color_hex}" border="2" sides="tb"'
else:
# shield is shown as a thin black wire
attributes = f'height="2" bgcolor="#000000" border="0"'
wirehtml.append(f' |
')
wirehtml.append(' | |
')
wirehtml.append('
')
html = [row.replace('', '\n'.join(wirehtml)) for row in html]
# connections
for connection_color in cable.connections:
if isinstance(connection_color.via_port, int): # check if it's an actual wire and not a shield
dot.attr('edge', color=':'.join(['#000000'] + wv_colors.get_color_hex(cable.colors[connection_color.via_port - 1], pad=pad) + ['#000000']))
else: # it's a shield connection
# shield is shown with specified color and black borders, or as a thin black wire otherwise
dot.attr('edge', color=':'.join(['#000000', shield_color_hex, '#000000']) if isinstance(cable.shield, str) else '#000000')
if connection_color.from_port is not None: # connect to left
from_port = f':p{connection_color.from_port}r' if self.connectors[connection_color.from_name].style != 'simple' else ''
code_left_1 = f'{connection_color.from_name}{from_port}:e'
code_left_2 = f'{cable.name}:w{connection_color.via_port}:w'
dot.edge(code_left_1, code_left_2)
from_string = f'{connection_color.from_name}:{connection_color.from_port}' if self.connectors[connection_color.from_name].show_name else ''
html = [row.replace(f'', from_string) for row in html]
if connection_color.to_port is not None: # connect to right
code_right_1 = f'{cable.name}:w{connection_color.via_port}:e'
to_port = f':p{connection_color.to_port}l' if self.connectors[connection_color.to_name].style != 'simple' else ''
code_right_2 = f'{connection_color.to_name}{to_port}:w'
dot.edge(code_right_1, code_right_2)
to_string = f'{connection_color.to_name}:{connection_color.to_port}' if self.connectors[connection_color.to_name].show_name else ''
html = [row.replace(f'', to_string) for row in html]
html = '\n'.join(html)
dot.node(cable.name, label=f'<\n{html}\n>', shape='box',
style='filled,dashed' if cable.category == 'bundle' else '', margin='0', fillcolor='white')
return dot
@property
def png(self):
from io import BytesIO
graph = self.create_graph()
data = BytesIO()
data.write(graph.pipe(format='png'))
data.seek(0)
return data.read()
@property
def svg(self):
from io import BytesIO
graph = self.create_graph()
data = BytesIO()
data.write(graph.pipe(format='svg'))
data.seek(0)
return data.read()
def output(self, filename: (str, Path), view: bool = False, cleanup: bool = True, fmt: tuple = ('pdf', )) -> None:
# graphical output
graph = self.create_graph()
for f in fmt:
graph.format = f
graph.render(filename=filename, view=view, cleanup=cleanup)
graph.save(filename=f'{filename}.gv')
# bom output
bom_list = self.bom_list()
with open_file_write(f'{filename}.bom.tsv') as file:
file.write(tuplelist2tsv(bom_list))
# HTML output
with open_file_write(f'{filename}.html') as file:
file.write('\n')
file.write('')
file.write('Diagram
')
with open_file_read(f'{filename}.svg') as svg:
file.write(re.sub(
'^<[?]xml [^?>]*[?]>[^<]*]*>',
'',
svg.read(1024), 1))
for svgdata in svg:
file.write(svgdata)
file.write('Bill of Materials
')
listy = flatten2d(bom_list)
file.write('')
file.write('')
for item in listy[0]:
file.write(f'| {item} | ')
file.write('
')
for row in listy[1:]:
file.write('')
for i, item in enumerate(row):
item_str = item.replace('\u00b2', '²')
align = 'align="right"' if listy[0][i] == 'Qty' else ''
file.write(f'| {item_str} | ')
file.write('
')
file.write('
')
file.write('')
def bom(self):
bom = []
bom_connectors = []
bom_cables = []
bom_extra = []
# connectors
connector_group = lambda c: (c.type, c.subtype, c.pincount, c.manufacturer, c.mpn, c.pn)
for group in Counter([connector_group(v) for v in self.connectors.values()]):
items = {k: v for k, v in self.connectors.items() if connector_group(v) == group}
shared = next(iter(items.values()))
designators = list(items.keys())
designators.sort()
conn_type = f', {remove_line_breaks(shared.type)}' if shared.type else ''
conn_subtype = f', {remove_line_breaks(shared.subtype)}' if shared.subtype else ''
conn_pincount = f', {shared.pincount} pins' if shared.style != 'simple' else ''
conn_color = f', {shared.color}' if shared.color else ''
name = f'Connector{conn_type}{conn_subtype}{conn_pincount}{conn_color}'
item = {'item': name, 'qty': len(designators), 'unit': '', 'designators': designators if shared.show_name else '',
'manufacturer': remove_line_breaks(shared.manufacturer), 'mpn': remove_line_breaks(shared.mpn), 'pn': shared.pn}
bom_connectors.append(item)
bom_connectors = sorted(bom_connectors, key=lambda k: k['item']) # https://stackoverflow.com/a/73050
bom.extend(bom_connectors)
# cables
# TODO: If category can have other non-empty values than 'bundle', maybe it should be part of item name?
# The category needs to be included in cable_group to keep the bundles excluded.
cable_group = lambda c: (c.category, c.type, c.gauge, c.gauge_unit, c.wirecount, c.shield, c.manufacturer, c.mpn, c.pn)
for group in Counter([cable_group(v) for v in self.cables.values() if v.category != 'bundle']):
items = {k: v for k, v in self.cables.items() if cable_group(v) == group}
shared = next(iter(items.values()))
designators = list(items.keys())
designators.sort()
total_length = sum(i.length for i in items.values())
cable_type = f', {remove_line_breaks(shared.type)}' if shared.type else ''
gauge_name = f' x {shared.gauge} {shared.gauge_unit}' if shared.gauge else ' wires'
shield_name = ' shielded' if shared.shield else ''
name = f'Cable{cable_type}, {shared.wirecount}{gauge_name}{shield_name}'
item = {'item': name, 'qty': round(total_length, 3), 'unit': 'm', 'designators': designators,
'manufacturer': remove_line_breaks(shared.manufacturer), 'mpn': remove_line_breaks(shared.mpn), 'pn': shared.pn}
bom_cables.append(item)
# bundles (ignores wirecount)
wirelist = []
# list all cables again, since bundles are represented as wires internally, with the category='bundle' set
for bundle in self.cables.values():
if bundle.category == 'bundle':
# add each wire from each bundle to the wirelist
for index, color in enumerate(bundle.colors, 0):
wirelist.append({'type': bundle.type, 'gauge': bundle.gauge, 'gauge_unit': bundle.gauge_unit, 'length': bundle.length, 'color': color, 'designator': bundle.name,
'manufacturer': remove_line_breaks(index_if_list(bundle.manufacturer, index)),
'mpn': remove_line_breaks(index_if_list(bundle.mpn, index)),
'pn': index_if_list(bundle.pn, index)})
# join similar wires from all the bundles to a single BOM item
wire_group = lambda w: (w.get('type', None), w['gauge'], w['gauge_unit'], w['color'], w['manufacturer'], w['mpn'], w['pn'])
for group in Counter([wire_group(v) for v in wirelist]):
items = [v for v in wirelist if wire_group(v) == group]
shared = items[0]
designators = [i['designator'] for i in items]
designators = list(dict.fromkeys(designators)) # remove duplicates
designators.sort()
total_length = sum(i['length'] for i in items)
wire_type = f', {remove_line_breaks(shared["type"])}' if shared.get('type', None) else ''
gauge_name = f', {shared["gauge"]} {shared["gauge_unit"]}' if shared.get('gauge', None) else ''
gauge_color = f', {shared["color"]}' if 'color' in shared != '' else ''
name = f'Wire{wire_type}{gauge_name}{gauge_color}'
item = {'item': name, 'qty': round(total_length, 3), 'unit': 'm', 'designators': designators,
'manufacturer': shared['manufacturer'], 'mpn': shared['mpn'], 'pn': shared['pn']}
bom_cables.append(item)
bom_cables = sorted(bom_cables, key=lambda k: k['item']) # sort list of dicts by their values (https://stackoverflow.com/a/73050)
bom.extend(bom_cables)
for item in self.additional_bom_items:
name = item['description'] if item.get('description', None) else ''
if isinstance(item.get('designators', None), List):
item['designators'].sort() # sort designators if a list is provided
item = {'item': name, 'qty': item.get('qty', None), 'unit': item.get('unit', None), 'designators': item.get('designators', None),
'manufacturer': item.get('manufacturer', None), 'mpn': item.get('mpn', None), 'pn': item.get('pn', None)}
bom_extra.append(item)
bom_extra = sorted(bom_extra, key=lambda k: k['item'])
bom.extend(bom_extra)
return bom
def bom_list(self):
bom = self.bom()
keys = ['item', 'qty', 'unit', 'designators'] # these BOM columns will always be included
for fieldname in ['pn', 'manufacturer', 'mpn']: # these optional BOM columns will only be included if at least one BOM item actually uses them
if any(fieldname in x and x.get(fieldname, None) for x in bom):
keys.append(fieldname)
bom_list = []
# list of staic bom header names, headers not specified here are generated by capitilising the internal name
bom_headings = {
"pn": "P/N",
"mpn": "MPN"
}
bom_list.append([(bom_headings[k] if k in bom_headings else k.capitalize()) for k in keys]) # create header row with keys
for item in bom:
item_list = [item.get(key, '') for key in keys] # fill missing values with blanks
item_list = [', '.join(subitem) if isinstance(subitem, List) else subitem for subitem in item_list] # convert any lists into comma separated strings
item_list = ['' if subitem is None else subitem for subitem in item_list] # if a field is missing for some (but not all) BOM items
bom_list.append(item_list)
return bom_list