Skip to content

Reference

Handle VLCB formatting

It generates the appropriate strings which can be sent to CBUS / VLCB

Attributes:

Name Type Description
can_id

The Can ID for your software (default = 60)

Source code in src/pyvlcb/__init__.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
class VLCB:
    """Handle VLCB formatting

    It generates the appropriate strings which can be sent to CBUS / VLCB

    Attributes:
        can_id: The Can ID for your software (default = 60)

    """
    # 60 is default canid for canusb4 (127 is dcc controller)
    def __init__ (self, can_id: Optional[int] = 60) -> None:
        """Inits VLCB with a can_id

        Args:
            can_id: The can_id for the software (default = 60)
        """
        self.can_id = can_id
        self.debug = False

    # Takes input bytestring and parses header / data
    # Does not try and interpret op-code - that is left to VLCB_format
    def parse_input(self, input_bytes: bytes) -> VLCBFormat:
        """Parse a raw CBUS packet as an input bytestring

        Take a bytestring (or string) from the CBUS and extract the details

        Args: 
            input_types (bytestring): Input raw bytestring (or string)

        Returns:
            VLCBFormat: parsed data in VLCBFormat

        Raises:
            ValueError: If invalid data string

        """
        # Also allow string (no need to decode)
        if isinstance (input_bytes, str):
            input_string = input_bytes
        else:
            input_string = input_bytes.decode("utf-8")
        if (len(input_string) < 5):        # packets are actually much longer
            raise ValueError(f"input_bytes '{input_string}' is too short.")
        if (input_string[0] != ":"):
            raise ValueError(f"No start frame in '{input_string}'")
        if (input_string[1] != "S"):
            raise ValueError("Format not supported - only Standard frames allowed in {input_string}")
        # Use try when converting to number in case of error
        try:
            header = input_string[2:6]
            header_val = int(header, 16)
        except:
            raise ValueError(f"Invalid format, number expected {header}")
            header_val = 0
        logger.debug (f"Header {hex(header_val)}")
        priority = (header_val & 0xf000) >> 12
        logger.debug (f"Priority {priority:b}")
        can_id = (header_val & 0xfe0) >> 5
        logger.debug(f"Can ID {can_id}")
        # Next is N / RTR can be ignored
        logger.debug(f"N / RTR {input_string[6]}")
        # Data is rest excluding ; 
        data = input_string[7:-1]
        logger.debug(f"Data {data}")
        # Creates a VLCB_format and returns that
        return VLCBFormat (priority, can_id, data)

    # Parse and format into standard log format (datastring, direction, fulldata, direction, can_id, op_code, data
    # For log all values are returned as strings - note that the number (log entry number) is not returned
    def log_entry(self, input_string: str) -> list[str]:
        """Parse a log entry and return as a list of string values

        Args:
            input_string (string): String consisting of of num, date, direction, message as a single string

        Returns (list[str]): List of strings

        """
        # First remove number and date from the front of the string
        entry_parts = input_string.split(',', 3)
        date_string = entry_parts[1]
        direction = entry_parts[2]
        message = entry_parts[3]
        vlcb_entry = self.parse_input (message)
        # Error handling of invalid packet
        if vlcb_entry == False:
            return [message, "??", "", "Invalid data"]
        # convert op-code to string
        # opcode is first two chars of data
        opcode = vlcb_entry.data[0:2]
        opcode_string = f'{opcode} - {VLCBOpcode.opcode_mnemonic(opcode)}'
        #data_string = f"{VLCBOpcode.parse_data(vlcb_entry.data)}"
        data_string = dict_to_string(VLCBOpcode.parse_data(vlcb_entry.data))
        return [date_string, direction, message, str(vlcb_entry.can_id), opcode_string, data_string]
        # Todo - error handling 


    # Static Methods moved to utils
    #num_to_1hexstr = staticmethod(num_to_1hexstr)
    #num_to_2hexstr = staticmethod(num_to_2hexstr)
    #num_to_4hexstr = staticmethod(num_to_4hexstr)


    # Create header using low priority and can_id (or self.can_id)
    # If opcode provided, but no priority then appropriate min code looked up
    # MajPri would be based on packet aging - needs to be managed outside of this
    def make_header(self, 
                majpri: int = 0b10, 
                minpri: Optional[int] = None, 
                can_id: Optional[int] = None, 
                opcode: Optional[int] = None) -> str:
        """Create a CBUS/VLCB header

        Args:
            byte1 (byte): Most significant byte
            byte2 (byte): Least significant byte

        Returns:
            String: A hex representation of the number
        """
        if can_id == None:
            can_id = self.can_id

        if minpri == None and opcode != None:
            minpri = VLCBOpcode.opcode_priority(opcode)

        # If opcode not updated then use default low priority
        # Lower number is higher priority
        if minpri == None:
            minpri = 0b11

        header_val = (majpri << 14) + (minpri << 12) + (can_id << 5)
        header_to_hex = ("000" + hex(header_val).upper()[2:])[-4:]
        header_string = f':S{header_to_hex}N'
        return header_string
        #return header_string.encode('utf-8')


    # Discover nodes
    def discover (self) -> str:
        """Create a discovery string 

        Uses op-code QNN (0D)

        Returns:
            String: A string for the request
        """
        # Return QNN 
        return self.make_header(opcode='0D') + '0D;'

    # Discover number of events configured
    def discover_evn (self, node_id: int) -> str:
        """Create a discover number of events for a node

        Uses op-code RQEVN (58)

        Args:
            node_id (int): Node ID to query

        Returns:
            String: A string for the request
        """
        return f"{self.make_header(opcode='58')}58{num_to_2hexstr(node_id)};" 

    # Discover number of events available
    def discover_nevn (self, node_id: int) -> str:
        """Create a discover number of events available for a node

        Uses op-code NNEVN (56)

        Args:
            node_id (int): Node ID to query

        Returns:
            String: A string for the request
        """
        return f"{self.make_header(opcode='56')}56{num_to_2hexstr(node_id)};"

    # Discover stored events NERD
    def discover_nerd (self, node_id: int) -> str:
        """Create a discover stored events for a node

        Uses op-code NERD (57)

        Args:
            node_id (int): Node ID to query

        Returns:
            String: A string for the request
        """
        return f"{self.make_header(opcode='57')}57{num_to_2hexstr(node_id)};"

    # Emergency stop all locos
    # RESTP
    def loco_stop_all (self) -> str:
        """Create an emergency stop all locos

        Uses op-code RESTP (0A)

        Returns:
            String: A string for the request
        """
        return f"{self.make_header(opcode='0A')}0A;"

    # node and ev should be the IDs - state either "on" or "off" / True or False
    def accessory_command (self, node_id: int, ev_id: int, state: Union[str, bool]) -> str:
        """Create an accessory command

        Uses appropriate Accessory On / Off command
        Defaulting to short, but using long if > 0xffff

        Args:
            node_id: Node ID to query
            ev_id: Event ID
            state: State to change to can be "on" or "off" / True or False

        Returns:
            String: A string for the request
        """
        # if ev_id is a string then convert to an int
        # Setting based to 0 will automatically handle base 10 or hex
        ev_id = int(ev_id, 0)
        # determine if long or short
        if ev_id <= 0xffff:
            return self.accessory_short_command (node_id, ev_id, state)
        else:
            return self.accessory_long_command (node_id, ev_id, state)

    # Note that short is the same as long but different code and node_id is added (already included in long)
    def accessory_short_command (self, node_id: int, ev_id: int, state: Union[str, bool]) -> str:
        """Create an accessory short command

        Uses ASON (98) or ASOF (99)

        Args:
            node_id: Node ID to query
            ev_id: Event ID
            state: State to change to can be "on" or "off" / True or False

        Returns:
            String: A string for the request
        """
        # Turn on
        if state == True or state == "on":
            # ASON
            return f"{self.make_header(opcode='98')}98{num_to_2hexstr(node_id)}{num_to_2hexstr(ev_id)};"
        # Turn off = ASOFF
        else:
            return f"{self.make_header(opcode='99')}99{num_to_2hexstr(node_id)}{num_to_2hexstr(ev_id)};"

    def accessory_long_command (self, node_id: int, ev_id: int, state: Union[str, bool]) -> str:
        """Create an accessory long command

        Uses ACON (90) or ACOF (91)

        Args:
            node_id: Node ID to query
            ev_id: Event ID
            state: State to change to can be "on" or "off" / True or False

        Returns:
            String: A string for the request
        """
        # Turn on
        if state == True or state == "on":
            # ASON
            return f"{self.make_header(opcode='90')}90{num_to_4hexstr(ev_id)};"
        # Turn off = ASOFF
        else:
            return f"{self.make_header(opcode='91')}91{num_to_4hexstr(ev_id)};"

    # RLOC (Allocate loco) :SB040N40D446;
    # Short address upper address all zeros, only 6 bits of the lower byte are used (1 to 127) 0 is decoderless
    # :SB040N40D446 D446 becomes 5190(10) = 1446(H) + C000 (highest 2 bits set by CAB - indicate long mode)

    # Generate code to allocate a loco
    # Assume long code, but if long = False and ID < 128 then use short mode
    def allocate_loco (self, loco_id: int, long: Optional[bool] = True) -> str:
        """Create an allocate loco request

        Uses RLOC (40)

        Args:
            loco_id: Loco ID (long or short number)
            long: Long loco ID (True) or short loco ID (False)

        Returns:
            String: A string for the request
        """
        # Generate RLOC to allocate loco to a session
        if long == False and loco_id >= 127:
            raise ValueError ("Invalid short code. Loco ID {loco_id} is larger than 127")
        if long == True:
            loco_id = loco_id | 0xC000
        return f"{self.make_header(opcode='40')}40{num_to_2hexstr(loco_id)};"

    def release_loco (self, session_id: int) -> str:
        """Create a release loco request

        Uses KLOC (21)

        Args:
            session_id: Session number

        Returns:
            String: A string for the request
        """
        return f"{self.make_header(opcode='21')}21{num_to_1hexstr(session_id)};"

    def steal_loco (self, loco_id: int, long: Optional[bool] = True) -> str:
        """Create an steal loco request

        Takes the loco and other connectons to the loco should be terminated.        
        Uses GLOC (60)

        Args:
            loco_id: Loco ID (long or short number)
            long: Long loco ID (True) or short loco ID (False)

        Returns:
            String: A string for the request

        Raises:
            InvalidLocoError
        """
        # GLOC 61 - flag = 1 for steal, flag = for share
        if long == False and loco_id >= 127:
            raise InvalidLocoError(f"Invalid short code {loco_id}")
        if long == True:
            loco_id = loco_id | 0xC000
        return f"{self.make_header(opcode='61')}61{num_to_2hexstr(loco_id)}01;"   

    def share_loco (self, loco_id: int, long: Optional[bool] = True) -> str:
        """Create an share loco request

        Takes the loco and other connectons to the loco can remain.
        Uses GLOC (61)

        Args:
            loco_id: Loco ID (long or short number)
            long: Long loco ID (True) or short loco ID (False)

        Returns:
            String: A string for the request

        Raises:
            InvalidLocoError
        """
        # GLOC 61 - flag = 1 for steal, flag = for share
        if long == False and loco_id >= 127:
            raise InvalidLocoError(f"Invalid short code {loco_id}")
        if long == True:
            loco_id = loco_id | 0xC000
        return f"{self.make_header(opcode='61')}61{num_to_2hexstr(loco_id)}02;" 

    def keep_alive (self, session_id: int) -> str:
        """Create an keep alive request

        For any loco allocated send a keep alive at least every 4 seconds
        Uses DKEEP (23)

        Args:
            session_id: Session ID 

        Returns:
            String: A string for the request
        """
        return f"{self.make_header(opcode='23')}23{num_to_1hexstr(session_id)};"

    def loco_speed_dir (self, session_id: int, speed: int, direction: int) -> str:
        """Set loco speed and direction based on separate arguments

        Same as loco_speeddir but this takes 2 arguments, whereas loco_speeddir needs a combined value
        Maximum call this once every 32 milliseconds

        Uses DSPD (47)

        Args:
            session_id: Session ID
            speed: 0 to 127 (1 is increased to 2 to avoid emergency stop)
            direction: 1 = forward, 0 = reverse

        Returns:
            String: A string for the request

        Raises:
            ValueError is speed is out of range, or invalid direction
        """
        if speed < 0 or speed > 127:
            raise ValueError ("Invalid speed specified. Must be in range 0 to 127")
        if direction <0 or direction > 1:
            raise ValueError ("Direction is not valid. Use 1 for forward, 0 for reverse")
        # special case - ignore emergency stop
        if speed == 1:
            speed = 2
        speeddir = (direction * 0x80) + speed
        return self.loco_speeddir (session_id, speeddir)

    # Set loco speed and direction (always done together)
    def loco_speeddir (self, session_id: int, speeddir: int) -> str:
        """Set loco speed and direction

        Maximum call this once every 32 milliseconds
        Needs combined speed and direction value.
        If speed is set to 1 then that is considered an emergency stop

        Uses DSPD (47)

        Args:
            session_id: Session ID
            speeddir: Unsigned 8 bit number. MSB is direction, 7 bits for speed

        Returns:
            String: A string for the request
        """
        return f"{self.make_header(opcode='47')}47{num_to_1hexstr(session_id)}{num_to_1hexstr(speeddir)};"

    # Set function using DFUN - needs to be provided with the two bytes
    # First byte is group (1 = F1 to F4, 2 = F5 to F8, 3 = F9 to F12)
    # 4 = F13 to 19, 5 = F20 to F28
    # Second byte is 1 bit per function - set 1 for on, 0 for off, lsb to right
    # eg. 1 = 0001, 2 = 0010
    def loco_set_dfun (self, session_id: int, byte1: bytes, byte2: bytes) -> str:
        """Create a set function request

        Uses DFUN (60)

        Args:
            session_id: Session ID
            byte1: Function group. 1 = F1 to F4, 2 = F5 to F8, 3 = F9 to F12, 4 = F13 to 19, 5 = F20 to F28
            byte2: 1 bit per function 1=on, 0=off. LSB to right eg 1 = 0001, 2 = 0010

        Returns:
            String: A string for the request
        """
        return f"{self.make_header(opcode='60')}60{num_to_1hexstr(session_id)}{num_to_1hexstr(byte1)}{num_to_1hexstr(byte2)};"

    def loco_set_function (self, session_id: int, function_num, function_list) -> str:
        """Create a set function request using the function list
        Sends the entire group of functions where the function_num resides
        This is an alternative to loco_set_dfun as this calculates the bytes
        this method can only be used for functions 0 to 27

        Uses DFUN (60)

        Args:
            session_id: Session ID
            byte1: Function number
            byte2: List of current function statuses

        Returns:
            String: A string for the request

        Raises:
            ValueError: Typically raised from f_to_bytes
        """
        byte1_2 = f_to_bytes(function_num, function_list)
        return f"{self.make_header(opcode='60')}60{num_to_1hexstr(session_id)}{byte1_2[0]}{byte1_2[1]};"

__init__(can_id=60)

Inits VLCB with a can_id

Parameters:

Name Type Description Default
can_id Optional[int]

The can_id for the software (default = 60)

60
Source code in src/pyvlcb/__init__.py
44
45
46
47
48
49
50
51
def __init__ (self, can_id: Optional[int] = 60) -> None:
    """Inits VLCB with a can_id

    Args:
        can_id: The can_id for the software (default = 60)
    """
    self.can_id = can_id
    self.debug = False

accessory_command(node_id, ev_id, state)

Create an accessory command

Uses appropriate Accessory On / Off command Defaulting to short, but using long if > 0xffff

Parameters:

Name Type Description Default
node_id int

Node ID to query

required
ev_id int

Event ID

required
state Union[str, bool]

State to change to can be "on" or "off" / True or False

required

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def accessory_command (self, node_id: int, ev_id: int, state: Union[str, bool]) -> str:
    """Create an accessory command

    Uses appropriate Accessory On / Off command
    Defaulting to short, but using long if > 0xffff

    Args:
        node_id: Node ID to query
        ev_id: Event ID
        state: State to change to can be "on" or "off" / True or False

    Returns:
        String: A string for the request
    """
    # if ev_id is a string then convert to an int
    # Setting based to 0 will automatically handle base 10 or hex
    ev_id = int(ev_id, 0)
    # determine if long or short
    if ev_id <= 0xffff:
        return self.accessory_short_command (node_id, ev_id, state)
    else:
        return self.accessory_long_command (node_id, ev_id, state)

accessory_long_command(node_id, ev_id, state)

Create an accessory long command

Uses ACON (90) or ACOF (91)

Parameters:

Name Type Description Default
node_id int

Node ID to query

required
ev_id int

Event ID

required
state Union[str, bool]

State to change to can be "on" or "off" / True or False

required

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def accessory_long_command (self, node_id: int, ev_id: int, state: Union[str, bool]) -> str:
    """Create an accessory long command

    Uses ACON (90) or ACOF (91)

    Args:
        node_id: Node ID to query
        ev_id: Event ID
        state: State to change to can be "on" or "off" / True or False

    Returns:
        String: A string for the request
    """
    # Turn on
    if state == True or state == "on":
        # ASON
        return f"{self.make_header(opcode='90')}90{num_to_4hexstr(ev_id)};"
    # Turn off = ASOFF
    else:
        return f"{self.make_header(opcode='91')}91{num_to_4hexstr(ev_id)};"

accessory_short_command(node_id, ev_id, state)

Create an accessory short command

Uses ASON (98) or ASOF (99)

Parameters:

Name Type Description Default
node_id int

Node ID to query

required
ev_id int

Event ID

required
state Union[str, bool]

State to change to can be "on" or "off" / True or False

required

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def accessory_short_command (self, node_id: int, ev_id: int, state: Union[str, bool]) -> str:
    """Create an accessory short command

    Uses ASON (98) or ASOF (99)

    Args:
        node_id: Node ID to query
        ev_id: Event ID
        state: State to change to can be "on" or "off" / True or False

    Returns:
        String: A string for the request
    """
    # Turn on
    if state == True or state == "on":
        # ASON
        return f"{self.make_header(opcode='98')}98{num_to_2hexstr(node_id)}{num_to_2hexstr(ev_id)};"
    # Turn off = ASOFF
    else:
        return f"{self.make_header(opcode='99')}99{num_to_2hexstr(node_id)}{num_to_2hexstr(ev_id)};"

allocate_loco(loco_id, long=True)

Create an allocate loco request

Uses RLOC (40)

Parameters:

Name Type Description Default
loco_id int

Loco ID (long or short number)

required
long Optional[bool]

Long loco ID (True) or short loco ID (False)

True

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
def allocate_loco (self, loco_id: int, long: Optional[bool] = True) -> str:
    """Create an allocate loco request

    Uses RLOC (40)

    Args:
        loco_id: Loco ID (long or short number)
        long: Long loco ID (True) or short loco ID (False)

    Returns:
        String: A string for the request
    """
    # Generate RLOC to allocate loco to a session
    if long == False and loco_id >= 127:
        raise ValueError ("Invalid short code. Loco ID {loco_id} is larger than 127")
    if long == True:
        loco_id = loco_id | 0xC000
    return f"{self.make_header(opcode='40')}40{num_to_2hexstr(loco_id)};"

discover()

Create a discovery string

Uses op-code QNN (0D)

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
173
174
175
176
177
178
179
180
181
182
def discover (self) -> str:
    """Create a discovery string 

    Uses op-code QNN (0D)

    Returns:
        String: A string for the request
    """
    # Return QNN 
    return self.make_header(opcode='0D') + '0D;'

discover_evn(node_id)

Create a discover number of events for a node

Uses op-code RQEVN (58)

Parameters:

Name Type Description Default
node_id int

Node ID to query

required

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
185
186
187
188
189
190
191
192
193
194
195
196
def discover_evn (self, node_id: int) -> str:
    """Create a discover number of events for a node

    Uses op-code RQEVN (58)

    Args:
        node_id (int): Node ID to query

    Returns:
        String: A string for the request
    """
    return f"{self.make_header(opcode='58')}58{num_to_2hexstr(node_id)};" 

discover_nerd(node_id)

Create a discover stored events for a node

Uses op-code NERD (57)

Parameters:

Name Type Description Default
node_id int

Node ID to query

required

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
213
214
215
216
217
218
219
220
221
222
223
224
def discover_nerd (self, node_id: int) -> str:
    """Create a discover stored events for a node

    Uses op-code NERD (57)

    Args:
        node_id (int): Node ID to query

    Returns:
        String: A string for the request
    """
    return f"{self.make_header(opcode='57')}57{num_to_2hexstr(node_id)};"

discover_nevn(node_id)

Create a discover number of events available for a node

Uses op-code NNEVN (56)

Parameters:

Name Type Description Default
node_id int

Node ID to query

required

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
199
200
201
202
203
204
205
206
207
208
209
210
def discover_nevn (self, node_id: int) -> str:
    """Create a discover number of events available for a node

    Uses op-code NNEVN (56)

    Args:
        node_id (int): Node ID to query

    Returns:
        String: A string for the request
    """
    return f"{self.make_header(opcode='56')}56{num_to_2hexstr(node_id)};"

keep_alive(session_id)

Create an keep alive request

For any loco allocated send a keep alive at least every 4 seconds Uses DKEEP (23)

Parameters:

Name Type Description Default
session_id int

Session ID

required

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
389
390
391
392
393
394
395
396
397
398
399
400
401
def keep_alive (self, session_id: int) -> str:
    """Create an keep alive request

    For any loco allocated send a keep alive at least every 4 seconds
    Uses DKEEP (23)

    Args:
        session_id: Session ID 

    Returns:
        String: A string for the request
    """
    return f"{self.make_header(opcode='23')}23{num_to_1hexstr(session_id)};"

loco_set_dfun(session_id, byte1, byte2)

Create a set function request

Uses DFUN (60)

Parameters:

Name Type Description Default
session_id int

Session ID

required
byte1 bytes

Function group. 1 = F1 to F4, 2 = F5 to F8, 3 = F9 to F12, 4 = F13 to 19, 5 = F20 to F28

required
byte2 bytes

1 bit per function 1=on, 0=off. LSB to right eg 1 = 0001, 2 = 0010

required

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def loco_set_dfun (self, session_id: int, byte1: bytes, byte2: bytes) -> str:
    """Create a set function request

    Uses DFUN (60)

    Args:
        session_id: Session ID
        byte1: Function group. 1 = F1 to F4, 2 = F5 to F8, 3 = F9 to F12, 4 = F13 to 19, 5 = F20 to F28
        byte2: 1 bit per function 1=on, 0=off. LSB to right eg 1 = 0001, 2 = 0010

    Returns:
        String: A string for the request
    """
    return f"{self.make_header(opcode='60')}60{num_to_1hexstr(session_id)}{num_to_1hexstr(byte1)}{num_to_1hexstr(byte2)};"

loco_set_function(session_id, function_num, function_list)

Create a set function request using the function list Sends the entire group of functions where the function_num resides This is an alternative to loco_set_dfun as this calculates the bytes this method can only be used for functions 0 to 27

Uses DFUN (60)

Parameters:

Name Type Description Default
session_id int

Session ID

required
byte1

Function number

required
byte2

List of current function statuses

required

Returns:

Name Type Description
String str

A string for the request

Raises:

Type Description
ValueError

Typically raised from f_to_bytes

Source code in src/pyvlcb/__init__.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
def loco_set_function (self, session_id: int, function_num, function_list) -> str:
    """Create a set function request using the function list
    Sends the entire group of functions where the function_num resides
    This is an alternative to loco_set_dfun as this calculates the bytes
    this method can only be used for functions 0 to 27

    Uses DFUN (60)

    Args:
        session_id: Session ID
        byte1: Function number
        byte2: List of current function statuses

    Returns:
        String: A string for the request

    Raises:
        ValueError: Typically raised from f_to_bytes
    """
    byte1_2 = f_to_bytes(function_num, function_list)
    return f"{self.make_header(opcode='60')}60{num_to_1hexstr(session_id)}{byte1_2[0]}{byte1_2[1]};"

loco_speed_dir(session_id, speed, direction)

Set loco speed and direction based on separate arguments

Same as loco_speeddir but this takes 2 arguments, whereas loco_speeddir needs a combined value Maximum call this once every 32 milliseconds

Uses DSPD (47)

Parameters:

Name Type Description Default
session_id int

Session ID

required
speed int

0 to 127 (1 is increased to 2 to avoid emergency stop)

required
direction int

1 = forward, 0 = reverse

required

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
def loco_speed_dir (self, session_id: int, speed: int, direction: int) -> str:
    """Set loco speed and direction based on separate arguments

    Same as loco_speeddir but this takes 2 arguments, whereas loco_speeddir needs a combined value
    Maximum call this once every 32 milliseconds

    Uses DSPD (47)

    Args:
        session_id: Session ID
        speed: 0 to 127 (1 is increased to 2 to avoid emergency stop)
        direction: 1 = forward, 0 = reverse

    Returns:
        String: A string for the request

    Raises:
        ValueError is speed is out of range, or invalid direction
    """
    if speed < 0 or speed > 127:
        raise ValueError ("Invalid speed specified. Must be in range 0 to 127")
    if direction <0 or direction > 1:
        raise ValueError ("Direction is not valid. Use 1 for forward, 0 for reverse")
    # special case - ignore emergency stop
    if speed == 1:
        speed = 2
    speeddir = (direction * 0x80) + speed
    return self.loco_speeddir (session_id, speeddir)

loco_speeddir(session_id, speeddir)

Set loco speed and direction

Maximum call this once every 32 milliseconds Needs combined speed and direction value. If speed is set to 1 then that is considered an emergency stop

Uses DSPD (47)

Parameters:

Name Type Description Default
session_id int

Session ID

required
speeddir int

Unsigned 8 bit number. MSB is direction, 7 bits for speed

required

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def loco_speeddir (self, session_id: int, speeddir: int) -> str:
    """Set loco speed and direction

    Maximum call this once every 32 milliseconds
    Needs combined speed and direction value.
    If speed is set to 1 then that is considered an emergency stop

    Uses DSPD (47)

    Args:
        session_id: Session ID
        speeddir: Unsigned 8 bit number. MSB is direction, 7 bits for speed

    Returns:
        String: A string for the request
    """
    return f"{self.make_header(opcode='47')}47{num_to_1hexstr(session_id)}{num_to_1hexstr(speeddir)};"

loco_stop_all()

Create an emergency stop all locos

Uses op-code RESTP (0A)

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
228
229
230
231
232
233
234
235
236
def loco_stop_all (self) -> str:
    """Create an emergency stop all locos

    Uses op-code RESTP (0A)

    Returns:
        String: A string for the request
    """
    return f"{self.make_header(opcode='0A')}0A;"

log_entry(input_string)

Parse a log entry and return as a list of string values

Parameters:

Name Type Description Default
input_string string

String consisting of of num, date, direction, message as a single string

required

Returns (list[str]): List of strings

Source code in src/pyvlcb/__init__.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def log_entry(self, input_string: str) -> list[str]:
    """Parse a log entry and return as a list of string values

    Args:
        input_string (string): String consisting of of num, date, direction, message as a single string

    Returns (list[str]): List of strings

    """
    # First remove number and date from the front of the string
    entry_parts = input_string.split(',', 3)
    date_string = entry_parts[1]
    direction = entry_parts[2]
    message = entry_parts[3]
    vlcb_entry = self.parse_input (message)
    # Error handling of invalid packet
    if vlcb_entry == False:
        return [message, "??", "", "Invalid data"]
    # convert op-code to string
    # opcode is first two chars of data
    opcode = vlcb_entry.data[0:2]
    opcode_string = f'{opcode} - {VLCBOpcode.opcode_mnemonic(opcode)}'
    #data_string = f"{VLCBOpcode.parse_data(vlcb_entry.data)}"
    data_string = dict_to_string(VLCBOpcode.parse_data(vlcb_entry.data))
    return [date_string, direction, message, str(vlcb_entry.can_id), opcode_string, data_string]

make_header(majpri=2, minpri=None, can_id=None, opcode=None)

Create a CBUS/VLCB header

Parameters:

Name Type Description Default
byte1 byte

Most significant byte

required
byte2 byte

Least significant byte

required

Returns:

Name Type Description
String str

A hex representation of the number

Source code in src/pyvlcb/__init__.py
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def make_header(self, 
            majpri: int = 0b10, 
            minpri: Optional[int] = None, 
            can_id: Optional[int] = None, 
            opcode: Optional[int] = None) -> str:
    """Create a CBUS/VLCB header

    Args:
        byte1 (byte): Most significant byte
        byte2 (byte): Least significant byte

    Returns:
        String: A hex representation of the number
    """
    if can_id == None:
        can_id = self.can_id

    if minpri == None and opcode != None:
        minpri = VLCBOpcode.opcode_priority(opcode)

    # If opcode not updated then use default low priority
    # Lower number is higher priority
    if minpri == None:
        minpri = 0b11

    header_val = (majpri << 14) + (minpri << 12) + (can_id << 5)
    header_to_hex = ("000" + hex(header_val).upper()[2:])[-4:]
    header_string = f':S{header_to_hex}N'
    return header_string

parse_input(input_bytes)

Parse a raw CBUS packet as an input bytestring

Take a bytestring (or string) from the CBUS and extract the details

Parameters:

Name Type Description Default
input_types bytestring

Input raw bytestring (or string)

required

Returns:

Name Type Description
VLCBFormat VLCBFormat

parsed data in VLCBFormat

Raises:

Type Description
ValueError

If invalid data string

Source code in src/pyvlcb/__init__.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def parse_input(self, input_bytes: bytes) -> VLCBFormat:
    """Parse a raw CBUS packet as an input bytestring

    Take a bytestring (or string) from the CBUS and extract the details

    Args: 
        input_types (bytestring): Input raw bytestring (or string)

    Returns:
        VLCBFormat: parsed data in VLCBFormat

    Raises:
        ValueError: If invalid data string

    """
    # Also allow string (no need to decode)
    if isinstance (input_bytes, str):
        input_string = input_bytes
    else:
        input_string = input_bytes.decode("utf-8")
    if (len(input_string) < 5):        # packets are actually much longer
        raise ValueError(f"input_bytes '{input_string}' is too short.")
    if (input_string[0] != ":"):
        raise ValueError(f"No start frame in '{input_string}'")
    if (input_string[1] != "S"):
        raise ValueError("Format not supported - only Standard frames allowed in {input_string}")
    # Use try when converting to number in case of error
    try:
        header = input_string[2:6]
        header_val = int(header, 16)
    except:
        raise ValueError(f"Invalid format, number expected {header}")
        header_val = 0
    logger.debug (f"Header {hex(header_val)}")
    priority = (header_val & 0xf000) >> 12
    logger.debug (f"Priority {priority:b}")
    can_id = (header_val & 0xfe0) >> 5
    logger.debug(f"Can ID {can_id}")
    # Next is N / RTR can be ignored
    logger.debug(f"N / RTR {input_string[6]}")
    # Data is rest excluding ; 
    data = input_string[7:-1]
    logger.debug(f"Data {data}")
    # Creates a VLCB_format and returns that
    return VLCBFormat (priority, can_id, data)

release_loco(session_id)

Create a release loco request

Uses KLOC (21)

Parameters:

Name Type Description Default
session_id int

Session number

required

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
330
331
332
333
334
335
336
337
338
339
340
341
def release_loco (self, session_id: int) -> str:
    """Create a release loco request

    Uses KLOC (21)

    Args:
        session_id: Session number

    Returns:
        String: A string for the request
    """
    return f"{self.make_header(opcode='21')}21{num_to_1hexstr(session_id)};"

share_loco(loco_id, long=True)

Create an share loco request

Takes the loco and other connectons to the loco can remain. Uses GLOC (61)

Parameters:

Name Type Description Default
loco_id int

Loco ID (long or short number)

required
long Optional[bool]

Long loco ID (True) or short loco ID (False)

True

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
def share_loco (self, loco_id: int, long: Optional[bool] = True) -> str:
    """Create an share loco request

    Takes the loco and other connectons to the loco can remain.
    Uses GLOC (61)

    Args:
        loco_id: Loco ID (long or short number)
        long: Long loco ID (True) or short loco ID (False)

    Returns:
        String: A string for the request

    Raises:
        InvalidLocoError
    """
    # GLOC 61 - flag = 1 for steal, flag = for share
    if long == False and loco_id >= 127:
        raise InvalidLocoError(f"Invalid short code {loco_id}")
    if long == True:
        loco_id = loco_id | 0xC000
    return f"{self.make_header(opcode='61')}61{num_to_2hexstr(loco_id)}02;" 

steal_loco(loco_id, long=True)

Create an steal loco request

Takes the loco and other connectons to the loco should be terminated.
Uses GLOC (60)

Parameters:

Name Type Description Default
loco_id int

Loco ID (long or short number)

required
long Optional[bool]

Long loco ID (True) or short loco ID (False)

True

Returns:

Name Type Description
String str

A string for the request

Source code in src/pyvlcb/__init__.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def steal_loco (self, loco_id: int, long: Optional[bool] = True) -> str:
    """Create an steal loco request

    Takes the loco and other connectons to the loco should be terminated.        
    Uses GLOC (60)

    Args:
        loco_id: Loco ID (long or short number)
        long: Long loco ID (True) or short loco ID (False)

    Returns:
        String: A string for the request

    Raises:
        InvalidLocoError
    """
    # GLOC 61 - flag = 1 for steal, flag = for share
    if long == False and loco_id >= 127:
        raise InvalidLocoError(f"Invalid short code {loco_id}")
    if long == True:
        loco_id = loco_id | 0xC000
    return f"{self.make_header(opcode='61')}61{num_to_2hexstr(loco_id)}01;"   

Handle USB serial communication to CANUSB4

Uses pyserial to communicate over USB.

Attributes:

Name Type Description
port

The usb port eg. /dev/ttyACM0 (RPi)

Source code in src/pyvlcb/canusb.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class CanUSB4 ():
    """Handle USB serial communication to CANUSB4

    Uses pyserial to communicate over USB.

    Attributes:
        port: The usb port eg. /dev/ttyACM0 (RPi)
    """
    def __init__ (self, 
                  port: str, 
                  baud: Optional[int] = 115200, 
                  timeout: Optional[float] = 0.01,
                  exclusive: Optional[bool] = True) -> None:
        """Inits CanUSB4 with a USB port

        Args:
            port: USB port eg. /dev/ttyACM0 (RPi)
            baud: Baud rate in bytes
            timeout: How long to wait for a serial timeout (seconds)

        Raises:
            DeviceConnectionError: If the port cannot be opened or is already in use.
            InvalidConfigurationError: If the port name is empty or invalid.

        """
        self.debug = False
        self.port = port
        self.baud = baud
        self.timeout = timeout
        self.max_retry = 30    # How many times to attempt on get_data must be at least as long as frame
        # Timeout for a request could be max_rety x timeout
        self.exclusive = exclusive	# Exclusive determines if check for exclusive use of the USB port
        # If this is set to false then if another application is already using the port then the application
        # will run, but appear to hang if there is no response because another thread has already
        # taken the incoming data

        if not port:
            raise InvalidConfigurationError("Port name cannot be empty")

        # buffer to hold partial string - allows us to continue if read ends partway through a packet
        self.current_buffer = ''
        # Track if we are in a valid string (ie. ignore any data outside of : ; blocks
        self.data_start = False
        self.connect()


    # Optional arguments override existing
    def connect(self,
                port: Optional[str] = None,
                baud: Optional[int] = None,
                timeout: Optional[float] = None,
                exclusive: Optional[bool] = None) -> None:
        """Inits CanUSB4 with a USB port

        Args:
            port: USB port eg. /dev/ttyACM0 (RPi)
            baud: Baud rate in bytes
            timeout: How long to wait for a serial timeout (seconds)

        Raises:
            DeviceConnectionError: If the port cannot be opened or is already in use.
            InvalidConfigurationError: If the port name is empty or invalid.

        """
        if port != None:
            self.port = port
        if not self.port:
            raise InvalidConfigurationError("Port name cannot be empty")
        if baud != None:
            self.baud = baud
        if timeout != None:
            self.timeout = timeout
        if exclusive != None:
            self.exclusive = exclusive
        try:
            self.ser = serial.Serial(
                self.port,
                self.baud,
                timeout=self.timeout,
                exclusive = exclusive
                )
        except serial.SerialException as e:
            raise DeviceConnectionError(f"Could not open port {self.port}") from e
        if self.ser:
            logger.info("Connected to serial port")


    # Data can either be string or bytestring
    def send_data(self, data: Union[str, bytes]) -> None:
        """Send data to serial

        Args:
            data: Data to send, normally from a VLCB method

        Raises:
            InvalidConfigurationError: If string contains invalid characters
            TypeError: If data passed is not a string or a bytestring
            DeviceConnectionError: Error sending data - possible connection lost
        """
        logger.debug(f"Sending {data}")
        if isinstance(data, str):
            try:
                # Convert string to bytes
                payload = data.encode('ascii') # using ascii which is more restrictive than default "utf-8"
            except UnicodeEncodeError as e:
                raise InvalidConfigurationError(f"String contains invalid characters: {data}") from e
        elif isinstance(data, bytes):
            # It's already bytes, just use it
            payload = data
        else:
            # User sent an int, list, or something else weird
            raise TypeError(f"Expected str or bytes, got {type(data).__name__}")

        # Send payload which is now bytes
        try:
            self.ser.write(payload)
        except serial.SerialException as e:
            raise DeviceConnectionError("Connection lost during write") from e

    def read_data(self) -> List[str]:
        """Read data from CanUSB4

        Returns:
            List: List of strings for all data read

        Raises:
            DeviceConnectionError: Error receiving data - possible connection list


        """

        num_bytes = self.ser.in_waiting
        # As each data string is read then it is stored into this list
        # Which allows all new packets to be returned
        received_data = []
        if num_bytes > 1:
            try:
                in_chars = self.ser.read(num_bytes)
            except serial.SerialException as e:
                raise DeviceConnectionError("Connection lost during read") from e
            # Unable to communicate with USB
            # Any other error
            except Exception as e:
                raise DeviceConnectionError("Unable to read other error") from e


            for i in range(0, len(in_chars)):
                this_char = chr(in_chars[i])
                # End of packet
                if this_char == ';':
                    # Check we have some data if not then ignore
                    if len(self.current_buffer) == 0:
                        continue
                    # Add the terminating char
                    self.current_buffer += this_char
                    logger.debug (f"Read {self.current_buffer}")
                    received_data.append(self.current_buffer)
                    # delete the data
                    self.current_buffer = ''
                    # no longer inside a data packet
                    self.data_start = False
                # Start of packet (resets string even if previous data)
                elif this_char == ':':
                    self.data_start = True
                    self.current_buffer = ':'
                # Only add character if we are inside a data block
                elif self.data_start == True:
                    self.current_buffer += this_char
                # If not then we are not in data block
                else:
                    continue
        return received_data    

__init__(port, baud=115200, timeout=0.01, exclusive=True)

Inits CanUSB4 with a USB port

Parameters:

Name Type Description Default
port str

USB port eg. /dev/ttyACM0 (RPi)

required
baud Optional[int]

Baud rate in bytes

115200
timeout Optional[float]

How long to wait for a serial timeout (seconds)

0.01

Raises:

Type Description
DeviceConnectionError

If the port cannot be opened or is already in use.

InvalidConfigurationError

If the port name is empty or invalid.

Source code in src/pyvlcb/canusb.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def __init__ (self, 
              port: str, 
              baud: Optional[int] = 115200, 
              timeout: Optional[float] = 0.01,
              exclusive: Optional[bool] = True) -> None:
    """Inits CanUSB4 with a USB port

    Args:
        port: USB port eg. /dev/ttyACM0 (RPi)
        baud: Baud rate in bytes
        timeout: How long to wait for a serial timeout (seconds)

    Raises:
        DeviceConnectionError: If the port cannot be opened or is already in use.
        InvalidConfigurationError: If the port name is empty or invalid.

    """
    self.debug = False
    self.port = port
    self.baud = baud
    self.timeout = timeout
    self.max_retry = 30    # How many times to attempt on get_data must be at least as long as frame
    # Timeout for a request could be max_rety x timeout
    self.exclusive = exclusive	# Exclusive determines if check for exclusive use of the USB port
    # If this is set to false then if another application is already using the port then the application
    # will run, but appear to hang if there is no response because another thread has already
    # taken the incoming data

    if not port:
        raise InvalidConfigurationError("Port name cannot be empty")

    # buffer to hold partial string - allows us to continue if read ends partway through a packet
    self.current_buffer = ''
    # Track if we are in a valid string (ie. ignore any data outside of : ; blocks
    self.data_start = False
    self.connect()

connect(port=None, baud=None, timeout=None, exclusive=None)

Inits CanUSB4 with a USB port

Parameters:

Name Type Description Default
port Optional[str]

USB port eg. /dev/ttyACM0 (RPi)

None
baud Optional[int]

Baud rate in bytes

None
timeout Optional[float]

How long to wait for a serial timeout (seconds)

None

Raises:

Type Description
DeviceConnectionError

If the port cannot be opened or is already in use.

InvalidConfigurationError

If the port name is empty or invalid.

Source code in src/pyvlcb/canusb.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def connect(self,
            port: Optional[str] = None,
            baud: Optional[int] = None,
            timeout: Optional[float] = None,
            exclusive: Optional[bool] = None) -> None:
    """Inits CanUSB4 with a USB port

    Args:
        port: USB port eg. /dev/ttyACM0 (RPi)
        baud: Baud rate in bytes
        timeout: How long to wait for a serial timeout (seconds)

    Raises:
        DeviceConnectionError: If the port cannot be opened or is already in use.
        InvalidConfigurationError: If the port name is empty or invalid.

    """
    if port != None:
        self.port = port
    if not self.port:
        raise InvalidConfigurationError("Port name cannot be empty")
    if baud != None:
        self.baud = baud
    if timeout != None:
        self.timeout = timeout
    if exclusive != None:
        self.exclusive = exclusive
    try:
        self.ser = serial.Serial(
            self.port,
            self.baud,
            timeout=self.timeout,
            exclusive = exclusive
            )
    except serial.SerialException as e:
        raise DeviceConnectionError(f"Could not open port {self.port}") from e
    if self.ser:
        logger.info("Connected to serial port")

read_data()

Read data from CanUSB4

Returns:

Name Type Description
List List[str]

List of strings for all data read

Raises:

Type Description
DeviceConnectionError

Error receiving data - possible connection list

Source code in src/pyvlcb/canusb.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
def read_data(self) -> List[str]:
    """Read data from CanUSB4

    Returns:
        List: List of strings for all data read

    Raises:
        DeviceConnectionError: Error receiving data - possible connection list


    """

    num_bytes = self.ser.in_waiting
    # As each data string is read then it is stored into this list
    # Which allows all new packets to be returned
    received_data = []
    if num_bytes > 1:
        try:
            in_chars = self.ser.read(num_bytes)
        except serial.SerialException as e:
            raise DeviceConnectionError("Connection lost during read") from e
        # Unable to communicate with USB
        # Any other error
        except Exception as e:
            raise DeviceConnectionError("Unable to read other error") from e


        for i in range(0, len(in_chars)):
            this_char = chr(in_chars[i])
            # End of packet
            if this_char == ';':
                # Check we have some data if not then ignore
                if len(self.current_buffer) == 0:
                    continue
                # Add the terminating char
                self.current_buffer += this_char
                logger.debug (f"Read {self.current_buffer}")
                received_data.append(self.current_buffer)
                # delete the data
                self.current_buffer = ''
                # no longer inside a data packet
                self.data_start = False
            # Start of packet (resets string even if previous data)
            elif this_char == ':':
                self.data_start = True
                self.current_buffer = ':'
            # Only add character if we are inside a data block
            elif self.data_start == True:
                self.current_buffer += this_char
            # If not then we are not in data block
            else:
                continue
    return received_data    

send_data(data)

Send data to serial

Parameters:

Name Type Description Default
data Union[str, bytes]

Data to send, normally from a VLCB method

required

Raises:

Type Description
InvalidConfigurationError

If string contains invalid characters

TypeError

If data passed is not a string or a bytestring

DeviceConnectionError

Error sending data - possible connection lost

Source code in src/pyvlcb/canusb.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def send_data(self, data: Union[str, bytes]) -> None:
    """Send data to serial

    Args:
        data: Data to send, normally from a VLCB method

    Raises:
        InvalidConfigurationError: If string contains invalid characters
        TypeError: If data passed is not a string or a bytestring
        DeviceConnectionError: Error sending data - possible connection lost
    """
    logger.debug(f"Sending {data}")
    if isinstance(data, str):
        try:
            # Convert string to bytes
            payload = data.encode('ascii') # using ascii which is more restrictive than default "utf-8"
        except UnicodeEncodeError as e:
            raise InvalidConfigurationError(f"String contains invalid characters: {data}") from e
    elif isinstance(data, bytes):
        # It's already bytes, just use it
        payload = data
    else:
        # User sent an int, list, or something else weird
        raise TypeError(f"Expected str or bytes, got {type(data).__name__}")

    # Send payload which is now bytes
    try:
        self.ser.write(payload)
    except serial.SerialException as e:
        raise DeviceConnectionError("Connection lost during write") from e

Handles a single VLCB packet

Attributes:

Name Type Description
priority

CAN priority

can_id

CAN ID

data

Remaining data as a hex str

Source code in src/pyvlcb/vlcbformat.py
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class VLCBFormat :
    """ Handles a single VLCB packet

    Attributes:
        priority: CAN priority
        can_id: CAN ID
        data: Remaining data as a hex str

    """ 

    def __init__ (self, priority: int, can_id: int, data: str) -> None:
        """Inits VLCBformat

        Args:
            priority: CAN priority
            can_id: CAN ID
            data: Remaining data as a hex string

        """
        self.priority = priority # Priority is actually high and low priority (2bit high / 2bit low) but just treated as single value
        self.can_id = can_id
        self.data = data # Data is left as hex string

    # Lookup OpCode
    def opcode (self): # -> Dict[str,str]:
        """Returns the opcode associated with the data string

        Returns:
            Dictionary of the opcode data

        Raises:
            ValueError: If opcode not found
        """
        str_value = self.data[0:2]
        if str_value in VLCBOpcode.opcodes.keys():
            return VLCBOpcode.opcodes[str_value]['opc']
        else:
            raise ValueError(f"Opcode {str_value} is not defined.")

    def get_data (self) -> OpcodeData:
        """Returns the opcode associated with the data string as a dict

        Returns:
            OpcodeData: Dict from the VLCBOpcode

        Raises:
            ValueError: If opcode not found
        """
        return VLCBOpcode.parse_data(self.data)

    def format_data (self) -> OpcodeData:
        """Returns the opcode associated with the data string

        Returns:
            OpcodeData: Dict from the VLCBOpcode

        Raises:
            ValueError: If opcode not found
        """
        return VLCBOpcode.parse_data(self.data)

    def get_loco_id (self) -> int:
        """Converts AddrHigh and AddrLow into a loco_id

        Only valid with certain VLCBFormat packets associated with Locos.
        If packet is does not contain the AddrHigh & AddrLow values (which
        may be formatted differently) then raises a InvalidLocoError

        Returns:
            loco_id: Loco number

        Raises:
            InvalidLocoError: If AddrHigh / AddrLow are not in the packet
        """
        loco_id = None
        if self.opcode() == "PLOC":
            # Get data
            data_dict = VLCBOpcode.parse_data(self.data)
            loco_id = data_dict['AddrHigh_AddrLow'] & 0x3FFF
        elif self.opcode() == "ERR":
            # also check it's one of the Error codes associated with allocate loco etc.
            # 1 = loco stack full 2 = loco taken, 7 = invalid request
            # The following are not supported as data bytes contain session / consist ID and not loco_id
            # 3 = no session, 4 consist empty, 5 loco not found, 6 can bus error
            data_dict = VLCBOpcode.parse_data(self.data)
            if data_dict["ErrCode"] in [1, 2, 7]:
                loco_id = bytes_to_addr(data_dict['Byte1'],data_dict['Byte2']) & 0x3FFF
            else:
                raise InvalidLocoError (f"Error code {data_dict['ErrCode']} does not contain a loco_id")

        # If loco_id not updated then raise error
        if loco_id != None:
            return loco_id
        else:
            raise InvalidLocoError(f"Opcode {self.opcode()} does not contain a loco_id")

    def get_function_list (self) -> List[int]:
        """Where packet contains Fn1, Fn2, Fn3 (eg. PLOC)
        returns 

        Only valid with certain VLCBFormat packets associated with Locos.
        If packet is does not contain the 3 byte values (which
        may be formatted differently) then raises a InvalidFunctionError

        Returns:
            List of function values as 0 or 1 for each function as off and on

        Raises:
            InvalidFunctionError: If Function Bytes are not in the packet
        """
        if self.opcode() == "PLOC":
            # Get data
            data_dict = VLCBOpcode.parse_data(self.data)
            return bytes_to_functions (data_dict['Fn1'], data_dict['Fn2'], data_dict['Fn3'])
        else:
            raise InvalidLocoError(f"Opcode {self.opcode()} does not contain a loco_id")




    def __str__ (self):
        return f'{self.priority} : {self.can_id} : {self.opcode()} ({self.data[0:2]}) : {self.data} / {self.format_data()}'

__init__(priority, can_id, data)

Inits VLCBformat

Parameters:

Name Type Description Default
priority int

CAN priority

required
can_id int

CAN ID

required
data str

Remaining data as a hex string

required
Source code in src/pyvlcb/vlcbformat.py
25
26
27
28
29
30
31
32
33
34
35
36
def __init__ (self, priority: int, can_id: int, data: str) -> None:
    """Inits VLCBformat

    Args:
        priority: CAN priority
        can_id: CAN ID
        data: Remaining data as a hex string

    """
    self.priority = priority # Priority is actually high and low priority (2bit high / 2bit low) but just treated as single value
    self.can_id = can_id
    self.data = data # Data is left as hex string

format_data()

Returns the opcode associated with the data string

Returns:

Name Type Description
OpcodeData OpcodeData

Dict from the VLCBOpcode

Raises:

Type Description
ValueError

If opcode not found

Source code in src/pyvlcb/vlcbformat.py
65
66
67
68
69
70
71
72
73
74
def format_data (self) -> OpcodeData:
    """Returns the opcode associated with the data string

    Returns:
        OpcodeData: Dict from the VLCBOpcode

    Raises:
        ValueError: If opcode not found
    """
    return VLCBOpcode.parse_data(self.data)

get_data()

Returns the opcode associated with the data string as a dict

Returns:

Name Type Description
OpcodeData OpcodeData

Dict from the VLCBOpcode

Raises:

Type Description
ValueError

If opcode not found

Source code in src/pyvlcb/vlcbformat.py
54
55
56
57
58
59
60
61
62
63
def get_data (self) -> OpcodeData:
    """Returns the opcode associated with the data string as a dict

    Returns:
        OpcodeData: Dict from the VLCBOpcode

    Raises:
        ValueError: If opcode not found
    """
    return VLCBOpcode.parse_data(self.data)

get_function_list()

Where packet contains Fn1, Fn2, Fn3 (eg. PLOC) returns

Only valid with certain VLCBFormat packets associated with Locos. If packet is does not contain the 3 byte values (which may be formatted differently) then raises a InvalidFunctionError

Returns:

Type Description
List[int]

List of function values as 0 or 1 for each function as off and on

Raises:

Type Description
InvalidFunctionError

If Function Bytes are not in the packet

Source code in src/pyvlcb/vlcbformat.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def get_function_list (self) -> List[int]:
    """Where packet contains Fn1, Fn2, Fn3 (eg. PLOC)
    returns 

    Only valid with certain VLCBFormat packets associated with Locos.
    If packet is does not contain the 3 byte values (which
    may be formatted differently) then raises a InvalidFunctionError

    Returns:
        List of function values as 0 or 1 for each function as off and on

    Raises:
        InvalidFunctionError: If Function Bytes are not in the packet
    """
    if self.opcode() == "PLOC":
        # Get data
        data_dict = VLCBOpcode.parse_data(self.data)
        return bytes_to_functions (data_dict['Fn1'], data_dict['Fn2'], data_dict['Fn3'])
    else:
        raise InvalidLocoError(f"Opcode {self.opcode()} does not contain a loco_id")

get_loco_id()

Converts AddrHigh and AddrLow into a loco_id

Only valid with certain VLCBFormat packets associated with Locos. If packet is does not contain the AddrHigh & AddrLow values (which may be formatted differently) then raises a InvalidLocoError

Returns:

Name Type Description
loco_id int

Loco number

Raises:

Type Description
InvalidLocoError

If AddrHigh / AddrLow are not in the packet

Source code in src/pyvlcb/vlcbformat.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def get_loco_id (self) -> int:
    """Converts AddrHigh and AddrLow into a loco_id

    Only valid with certain VLCBFormat packets associated with Locos.
    If packet is does not contain the AddrHigh & AddrLow values (which
    may be formatted differently) then raises a InvalidLocoError

    Returns:
        loco_id: Loco number

    Raises:
        InvalidLocoError: If AddrHigh / AddrLow are not in the packet
    """
    loco_id = None
    if self.opcode() == "PLOC":
        # Get data
        data_dict = VLCBOpcode.parse_data(self.data)
        loco_id = data_dict['AddrHigh_AddrLow'] & 0x3FFF
    elif self.opcode() == "ERR":
        # also check it's one of the Error codes associated with allocate loco etc.
        # 1 = loco stack full 2 = loco taken, 7 = invalid request
        # The following are not supported as data bytes contain session / consist ID and not loco_id
        # 3 = no session, 4 consist empty, 5 loco not found, 6 can bus error
        data_dict = VLCBOpcode.parse_data(self.data)
        if data_dict["ErrCode"] in [1, 2, 7]:
            loco_id = bytes_to_addr(data_dict['Byte1'],data_dict['Byte2']) & 0x3FFF
        else:
            raise InvalidLocoError (f"Error code {data_dict['ErrCode']} does not contain a loco_id")

    # If loco_id not updated then raise error
    if loco_id != None:
        return loco_id
    else:
        raise InvalidLocoError(f"Opcode {self.opcode()} does not contain a loco_id")

opcode()

Returns the opcode associated with the data string

Returns:

Type Description

Dictionary of the opcode data

Raises:

Type Description
ValueError

If opcode not found

Source code in src/pyvlcb/vlcbformat.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def opcode (self): # -> Dict[str,str]:
    """Returns the opcode associated with the data string

    Returns:
        Dictionary of the opcode data

    Raises:
        ValueError: If opcode not found
    """
    str_value = self.data[0:2]
    if str_value in VLCBOpcode.opcodes.keys():
        return VLCBOpcode.opcodes[str_value]['opc']
    else:
        raise ValueError(f"Opcode {str_value} is not defined.")

List of opcodes and other related data

Includes format information and user friendly strings

Attributes:

Name Type Description
opcodes

Dict of opcodes indexed by opcode number as a hex string

field_formats

Dict of data type and number of characters for each field

accessory_codes

Dict of accessory on and off codes

Source code in src/pyvlcb/vlcbformat.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
class VLCBOpcode:
    """List of opcodes and other related data

    Includes format information and user friendly strings

    Attributes:
        opcodes: Dict of opcodes indexed by opcode number as a hex string
        field_formats: Dict of data type and number of characters for each field
        accessory_codes: Dict of accessory on and off codes

    """
    # Dict from opcode to dict of opcode information
    opcodes = {
        '00':  {'opc': 'ACK', 'title': 'General Acknowledgement', 'format': '', 'minpri': 2, 'comment': 'Positive response to query/request performed for report of availability online'},
        '01':  {'opc': 'NAK', 'title': 'General No Ack', 'format': '', 'minpri': 2, 'comment': 'Negative response to query/request denied'},
        '02':  {'opc': 'HLT', 'title': 'Bus Halt', 'format': '', 'minpri': 0, 'comment': 'Commonly broadcasted to all nodes to indicate CBUS is not available and no further packets should be sent until a BON or ARST is received'},
        '03':  {'opc': 'BON', 'title': 'Bus On', 'format': '', 'minpri': 1, 'comment': 'Commonly broadcasted to all nodes to indicate CBUS is available following a HLT.'},
        '04':  {'opc': 'TOF', 'title': 'Track Off', 'format': '', 'minpri': 1, 'comment': 'Commonly broadcasted to all nodes by a command station to indicate track power is off and no further command packets should be sent, except inquiries..'},
        '05':  {'opc': 'TON', 'title': 'Track On', 'format': '', 'minpri': 1, 'comment': 'Commonly broadcasted to all nodes by a command station to indicate track power is on.'},
        '06':  {'opc': 'ERSTOP', 'title': 'Emergency Stop', 'format': '', 'minpri': 1, 'comment': 'Commonly broadcase to all nodes by a command station to indicate all engines have been emergency stopped.'},
        '07':  {'opc': 'ARST', 'title': 'System Reset', 'format': '', 'minpri': 0, 'comment': 'Commonly broadcasted to all nodes to indicate a full system reset.'},
        '08':  {'opc': 'RTOF', 'title': 'Request Track Off', 'format': '', 'minpri': 1, 'comment': 'Sent to request change of track power to off.'},
        '09':  {'opc': 'RTON', 'title': 'Request Track On', 'format': '', 'minpri': 1, 'comment': 'Sent to request change of track power to on.'},
        '0A':  {'opc': 'RESTP', 'title': 'Request Emergency Stop All', 'format': '', 'minpri': 0, 'comment': 'Sent to request an emergency stop to all trains . Does not affect accessory control.'},
        '0C':  {'opc': 'RSTAT', 'title': 'Request Command Station Status', 'format': '', 'minpri': 2, 'comment': 'Sent to query the status of the command station. See description of (STAT) for the response from the command station.'},
        '0D':  {'opc': 'QNN', 'title': 'Query node number', 'format': '', 'minpri': 3, 'comment': 'Sent by a node to elicit a PNN reply from each node on the bus that has a node number. See OpCode 0xB6'},
        '10':  {'opc': 'RQNP', 'title': 'Request node parameters', 'format': '', 'minpri': 3, 'comment': 'Sent to a node while in ?setup?mode to read its parameter set. Used when initially configuring a node.'},
        '11':  {'opc': 'RQMN', 'title': 'Request module name', 'format': '', 'minpri': 2, 'comment': 'Sent by a node to request the name of the type of module that is in setup mode. The module in setup mode will reply with opcode NAME. See OpCode 0xE2'},
        # Session is the engine session number as HEX byte.
        '21':  {'opc': 'KLOC', 'title': 'Release Engine', 'format': 'Session', 'minpri': 2, 'comment': 'Sent by a CAB to the Command Station. The engine with that Session number is removed from the active engine list.'},
        '22':  {'opc': 'QLOC', 'title': 'Query Engine', 'format': 'Session', 'minpri': 2, 'comment': 'The command station responds with PLOC if the session is assigned. Otherwise responds with ERR: engine not found.'},
        '23':  {'opc': 'DKEEP', 'title': 'Session keep alive', 'format': 'Session', 'minpri': 2, 'comment': 'The cab sends a keep alive at regular intervals for the active session. The interval between keep alive messages must be less than the session timeout implemented by the command station.'},
        '30':  {'opc': 'DBG1', 'title': 'Debug with one data byte', 'format': 'Status', 'minpri': 2, 'comment': '<Dat1> is a freeform status byte for debugging during CBUS module development. Not used during normal operation'},
        '3F':  {'opc': 'EXTC', 'title': 'Extended op-code with no additional bytes', 'format': 'ExtOpc', 'minpri': 3, 'comment': 'Used if the basic set of 32 OPCs is not enough. Allows an additional 256 OPCs'},
        #AddrHigh_AddrLow = Address of the decoder - 7 bit addresses have (AddrH=0). 14 bit addresses have bits 6,7 of AddrH set to 1.
        '40':  {'opc': 'RLOC', 'title': 'Request engine session', 'format': 'AddrHigh_AddrLow', 'minpri': 2, 'comment': 'The command station responds with (PLOC) if engine is free and is being assigned. Otherwise responds with (ERR): engine in use or (ERR:) stack full. This command is typically sent by a cab to the command station following a change of the controlled decoder address. RLOC is exactly equivalent to GLOC with all flag bits set to zero, but command stations must continue to support RLOC for backwards compatibility.'},
        # Consist = Consist address
        # Index = Engine index of the consist
        '41':  {'opc': 'QCON', 'title': 'Query Consist', 'format': 'Consist,Index', 'minpri': 2, 'comment': 'Allows enumeration of a consist. Command station responds with PLOC if an engine exists at the specified index, otherwise responds with ERR: no more engines'},
        # NNHigh = high byte of the node number
        # NNLow = low byte of the node number
        '42':  {'opc': 'SNN', 'title': 'Set Node Number', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by a configuration tool to assign a node number to a requesting node in response to a RQNN message. The target node must be in ?setup? mode.'},
        # AllocCode = specific allocation code (1 byte)
        '43':  {'opc': 'ALOC', 'title': 'Allocate loco to activity', 'format': 'Session,AllocCode', 'minpri': 2, 'comment': ''},
        '44':  {'opc': 'STMOD', 'title': 'Set CAB session mode', 'format': 'Session,Mode', 'minpri': 2, 'comment': 'MMMMMMMM = mode bits: 0 ? 1: speed mode; 00 ? 128 speed steps; 01 ? 14 speed steps; 10 ? 28 speed steps with interleave steps; 11 ? 28 speed steps; 2: service mode; 3: sound control mode'},
        # Consist is consist address (8 bits)
        '45':  {'opc': 'PCON', 'title': 'Consist Engine', 'format': 'Session,Consist', 'minpri': 2, 'comment': 'Adds a decoder to a consist. Dat2 has bit 7 set if consist direction is reversed.'},
        '46':  {'opc': 'KCON', 'title': 'Remove Engine from consist', 'format': 'Session,Consist', 'minpri': 2, 'comment': 'Removes a loco from a consist.'},
        # SpeedDir = Speed/dir value. Most significant bit is direction and 7 bits are unsigned speed value. 
        '47':  {'opc': 'DSPD', 'title': 'Set Engine Speed/Dir', 'format': 'Session,SpeedDir', 'minpri': 0, 'comment': 'the unsigned speed value. Sent by a CAB or equivalent to request an engine speed/dir change.'},
        # SpeedFlag - Is speed flags
        '48':  {'opc': 'DFLG', 'title': 'Set Engine Flags', 'format': 'Session,SpeedFlag', 'minpri': 2, 'comment': 'Bits 0-1: Speed Mode 00 ? 128 speed steps; 01 ? 14 speed steps; 10 ? 28 speed steps with interleave steps; 11 ? 28 speed steps Bit 2: Lights On/OFF; Bit 3: Engine relative direction; Bits 4-5: Engine state (active =0 , consisted =1, consist master=2, inactive=3) Bits 6-7: Reserved.; Sent by a cab to notify the command station of a change in engine flags.'},
        # Fnum = Function number, 0 to 27
        '49':  {'opc': 'DFNON', 'title': 'Set Engine function on', 'format': 'Session,Fnum', 'minpri': 2, 'comment': 'Sent by a cab to turn on a specific loco function. This provides an alternative method to DFUN for controlling loco functions. A command station must implement both methods.'},
        '4C':  {'opc': 'SSTAT', 'title': 'Service mode status', 'format': 'Session,Status', 'minpri': 3, 'comment': 'Status returned by command station/programmer at end of programming operation that does not return data.'},
        '50':  {'opc': 'RQNN', 'title': 'Request node number', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by a node that is in setup/configuration mode and requests assignment of a node number (NN). The node allocating node numbers responds with (SNN) which contains the newly assigned node number. <NN hi> and <NN lo> are the existing node number, if the node has one. If it does not yet have a node number, these bytes should be set to zero.'},
        '51':  {'opc': 'NNREL', 'title': 'Node number release', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by node when taken out of service. e.g. when reverting to SLiM mode.'},
        '52':  {'opc': 'NNACK', 'title': 'Node number acknowledge', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by a node to verify its presence and confirm its node id. This message is sent to acknowledge an SNN.'},
        '53':  {'opc': 'NNLRN', 'title': 'Set node into learn mode', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by a configuration tool to put a specific node into learn mode. Deprecated - replaced by MODE'},
        '54':  {'opc': 'NNULN', 'title': 'Release node from learn mode', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by a configuration tool to take node out of learn mode and revert to normal operation.'},
        '55':  {'opc': 'NNCLR', 'title': 'Clear all events from a node', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by a configuration tool to clear all events from a specific node. Must be in learn mode first to safeguard against accidental erasure of all events.'},
        '56':  {'opc': 'NNEVN', 'title': 'Read number of events available in a node', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by a configuration tool to read the number of available event slots in a node.Response is EVLNF (0x70)'},
        '57':  {'opc': 'NERD', 'title': 'Read back all stored events in a node', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by a configuration tool to read all the stored events in a node. Response is 0xF2.'},
        '58':  {'opc': 'RQEVN', 'title': 'Request to read number of stored events', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by a configuration tool to read the number of stored events in a node. Response is 0x74( NUMEV).'},
        '59':  {'opc': 'WRACK', 'title': 'Write acknowledge', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by a node to indicate the completion of a write to memory operation. All nodes must issue WRACK when a write operation to node variables, events or event variables has completed. This allows for teaching nodes where the processing time may be slow. Deprecated replaced by GRSP'},
        '5A':  {'opc': 'RQDAT', 'title': 'Request node data event', 'format': 'NN', 'minpri': 3, 'comment': 'Sent by one node to read the data event from another node.(eg: RFID data). Response is 0xF7 (ARDAT).'},
        # DN = Device number
        '5B':  {'opc': 'RQDDS', 'title': 'Request device data - short mode', 'format': 'DNHigh_DNLow', 'minpri': 3, 'comment': 'To request a data set from a device using the short event method. where DN is the device number. Response is 0xFB (DDRS)'},
        '5C':  {'opc': 'BOOTM', 'title': 'Put node into bootload mode', 'format': 'NN', 'minpri': 3, 'comment': 'For SliM nodes with no NN then the NN of the command must be zero. For SLiM nodes with an NN, and all FLiM nodes the command must contain the NN of the target node. Sent by a configuration tool to prepare for loading a new program. Deprecated replaced by MODE'},
        '5D':  {'opc': 'ENUM', 'title': 'Force a self enumeration cyble for use with CAN', 'format': 'NN', 'minpri': 3, 'comment': 'For nodes in FLiM using CAN as transport. This OPC will force a self-enumeration cycle for the specified node. A new CAN_ID will be allocated if needed. Following the ENUM sequence, the node should issue a NNACK to confirm completion and verify the new CAN_ID. If no CAN_ID values are available, an error message 7 will be issued instead. Deprecated replaced with automatic self enumeration.'},
        '5F':  {'opc': 'EXTC1', 'title': 'Extended op-code with 1 additional byte', 'format': 'ExtOpc,Byte1', 'minpri': 3, 'comment': 'Used if the basic set of 32 OPCs is not enough. Allows an additional 256 OPCs'},
        # 3rd data section
        '60':  {'opc': 'DFUN', 'title': 'Set Engine functions', 'format': 'Session,Fn1,Fn2', 'minpri': 2, 'comment': '<Dat2> (Fn1) is the function range. 1 is F0(FL) to F4; 2 is F5 to F8; 3 is F9 to F12; 4 is F13 to F20; 5 is F21 to F28; <Dat3> (Fn2) is the NMRA DCC format function byte for that range in corresponding bits. Sent by a CAB or equivalent to request an engine Fn state change.'},
        '61':  {'opc': 'GLOC', 'title': 'Get engine session', 'format': 'AddrHigh_AddrLow,Flags', 'minpri': 2, 'comment': '<Dat1> and <Dat2> are [AddrH] and [AddrL] of the decoder, respectively.; 7 bit addresses have (AddrH=0).; 14 bit addresses have bits 6,7 of AddrH set to 1.; <Flags> contains flag bits as follows:Bit 0: Set for "Steal" mode; Bit 1: Set for "Share" mode; Both bits set to 0 is exactly equivalent to an RLOC request; Both bits set to 1 is invalid, because the 2 modes are mutually exclusive; The command station responds with (PLOC) if the request is successful. Otherwise responds with (ERR): engine in use. (ERR:) stack full or (ERR) no session. The latter indicates that there is no current session to steal/share depending on the flag bits set in the request. GLOC with all flag bits set to zero is exactly equivalent to RLOC, but command stations must continue to support RLOC for backwards compatibility.'},
        '63':  {'opc': 'ERR', 'title': 'Command station error report', 'format': 'Byte1,Byte2,ErrCode', 'minpri': 2, 'comment': 'Sent in response to an error situation by a command station.'},
        '6F':  {'opc': 'CMDERR', 'title': 'Error messages from nodes during configuration', 'format': 'NN,Error', 'minpri': 3, 'comment': 'Sent by node if there is an error when a configuration command is sent. Deprecated replaced by GRSP.'},
        '70':  {'opc': 'EVNLF', 'title': 'Event space left reply from node', 'format': 'NN,EVSPC', 'minpri': 3, 'comment': 'EVSPC is a one byte value giving the number of available events left in that node.'},
        '71':  {'opc': 'NVRD', 'title': 'Request read of a node variable', 'format': 'NN,NVIndex', 'minpri': 3, 'comment': 'NV# is the index for the node variable value requested. Response is NVANS. VLCB also returns GRSP and support for NV#0.'},
        '72':  {'opc': 'NENRD', 'title': 'Request read of stored events by event index', 'format': 'NN,EnIndex', 'minpri': 3, 'comment': 'EN# is the index for the stored event requested. Response is 0xF2 (ENRSP)'},
        '73':  {'opc': 'RQNPN', 'title': 'Request read of a node parameter by index', 'format': 'NN,ParaIndex', 'minpri': 3, 'comment': 'Para# is the index for the parameter requested. Index 0 returns the number of available parameters, Response is 0x9B (PARAN). VLCB Para #0 returns a PARAN for each parameter'},
        '74':  {'opc': 'NUMEV', 'title': 'Number of events stored in node', 'format': 'NN,NumEvents', 'minpri': 3, 'comment': 'Response to request 0x58 (RQEVN)'},
        '75':  {'opc': 'CANID', 'title': 'Set a CAN_ID in existing FLiM node', 'format': 'NN,CAN_ID', 'minpri': 0, 'comment': 'Used to force a specified CAN_ID into a node. Value range is from 1 to 0x63 (99 decimal) This OPC must be used with care as duplicate CAN_IDs are not allowed. Values outside the permitted range will produce an error 7 message and the CAN_ID will not change. Deprecated replaced with self-enumaration. VLCB includes GRSP responses.'},
        '76':  {'opc': 'MODE', 'title': 'Request a change to a modules operating mode', 'format': 'NN,ModeCmd', 'minpri': 0, 'comment': 'Request to change the operational mode of the module. Mode cmds 0 = transition to setup mode, 1 = transition to normal mode, 16 = turn on FCU compat, 17 = turn off FCU compat. If supported then module returns GRSP. VLCB new features.'},
        '78':  {'opc': 'RQSD', 'title': 'Request service discover', 'format': 'NN,ServiceIndex', 'minpri': 0, 'comment': 'Request service data from a module if ServiceIndex is 0 then SD message sent, followed by ESD response for each services supported. VLCB new feature.'},
        '7F':  {'opc': 'EXTC2', 'title': 'Extended op-code with 2 additional bytes', 'format': 'ExtOpc,Byte1,Byte2', 'minpri': 0, 'comment': 'Used if the basic set of 32 OPCs is not enough. Allows an additional 256 OPCs'},
        # 4 data byte packets
        '80':  {'opc': 'RDCC3', 'title': 'Request 3-byte DCC Packet', 'format': 'Rep,Byte1,Byte2,Byte3', 'minpri': 3, 'comment': '<Dat1(REP)> is number of repetitions in sending the packet. <Dat2>..<Dat4> 3 bytes of the DCC packet. Allows a CAB or equivalent to request a 3 byte DCC packet to be sent to the track. The packet is sent <REP> times and is not refreshed on a regular basis. Note: a 3 byte DCC packet is the minimum allowed.'},
        '82':  {'opc': 'WCVO', 'title': 'Write CV (byte) in OPS mode', 'format': 'Session,CVHigh_CVLow,CVVal', 'minpri': 2, 'comment': '<Dat1> is the session number of the loco to be written to; <Dat2> is the MSB # of the CV to be written (supports CVs 1 - 65536); <Dat3> is the LSB # of the CV to be written; <Dat4> is the byte value to be written; Sent to the command station to write a DCC CV byte in OPS mode to specific loco.(on the main)'},
        '83':  {'opc': 'WCVB', 'title': 'Write CV (bit) in OPS mode', 'format': 'Session,CVHigh_CVLow,CVVal', 'minpri': 2, 'comment': '<Dat1> is the session number of the loco to be written to; <Dat2> is the MSB # of the CV to be written (supports CVs 1 - 65536); <Dat3> is the LSB # of the CV to be written; <Dat4> is the value to be written; The format for Dat4 is that specified in RP 9.2.1 for OTM bit manipulation in a DCC packet.; This is ?111CDBBB? where C is here is always 1 as only ?writes? are possible OTM. (unless some loco ACK scheme like RailCom is used). D is the bit value, either 0 or 1 and BBB is the bit position in the CV byte. 000 to 111 for bits 0 to 7.; Sent to the command station to write a DCC CV in OPS mode to specific loco.(on the main)'},
        '84':  {'opc': 'QCVS', 'title': 'Read CV', 'format': 'Session,CVHigh_CVLow,Mode', 'minpri': 2, 'comment': 'This command is used exclusively with service mode.; Sent by the cab to the command station in order to read a CV value. The command station shall respond with a PCVS message containing the value read, or SSTAT if the CV cannot be read.'},
        '85':  {'opc': 'PCVS', 'title': 'Report CV', 'format': 'Session,CVHigh_CVLow,CVVal', 'minpri': 2, 'comment': '<Dat1> is the session number of the cab; <Dat2> is the MSB # of the CV read (supports CVs 1 - 65536); <Dat3> is the LSB # of the CV read; <Dat4> is the read value; This command is used exclusively with service mode.; Sent by the command station to report a read CV.'},
        '87':  {'opc': 'RDGN', 'title': 'Request dianostic data', 'format': 'NN,ServiceIndex,DiagCode', 'minpri': 0, 'comment': 'Request diagnostic data from a module. If DiagCode is 0 then all data returned. If ServiceIndex 0 then send DGN message for each service, otherwise send DGN for service specified'},
        '8E':  {'opc': 'NVSETRD', 'title': 'Set an NV value with read back', 'format': 'NN,NNIndex,NVVal', 'minpri': 0, 'comment': 'Sets an NV value and responds with the new value, response may not be the value requested. VLCB new feature.'},
        '90':  {'opc': 'ACON', 'title': 'Accessory ON', 'format': 'NN,EnHigh_EnLow', 'minpri': 3, 'comment': '<Dat1> is the high byte of the node number; <Dat2> is the low byte of the node number; <Dat3> is the high byte of the event number; <Dat4> is the low byte of the event number; Indicates an ?ON? event using the full event number of 4 bytes. (long event)'},
        '91':  {'opc': 'ACOF', 'title': 'Accessory OFF', 'format': 'NN,EnHigh_EnLow', 'minpri': 3, 'comment': '<Dat1> is the high byte of the node number; <Dat2> is the low byte of the node number; <Dat3> is the high byte of the event number; <Dat4> is the low byte of the event number; Indicates an ?OFF? event using the full event number of 4 bytes. (long event)'},
        '92':  {'opc': 'AREQ', 'title': 'Accessory Request Event', 'format': 'NN,EnHigh_EnLow', 'minpri': 3, 'comment': '<Dat1> is the high byte of the node number (MS WORD of the full event #); <Dat2> is the low byte of the node number (MS WORD of the full event #); <Dat3> is the high byte of the event number; <Dat4> is the low byte of the event number; Indicates a ?request? event using the full event number of 4 bytes. (long event); A request event is used to elicit a status response from a producer when it is required to know the state of the producer without producing an ON or OFF event and to trigger an event from a combi node'},
        '93':  {'opc': 'ARON', 'title': 'Accessory Response Event', 'format': 'NN,EnHigh_EnLow', 'minpri': 3, 'comment': 'Indicates an ?ON? response event. A response event is a reply to a status request (AREQ) without producing an ON or OFF event.'},
        '94':  {'opc': 'AROF', 'title': 'Accessory Response Event', 'format': 'NN,EnHigh_EnLow', 'minpri': 3, 'comment': '<Dat1> is the high byte of the node number; <Dat2> is the low byte of the node number; <Dat3> is the high byte of the event number; <Dat4> is the low byte of the event number; Indicates an ‘OFF’ response event. A response event is a reply to a status request; (AREQ) without producing an ON or OFF event'},
        '95':  {'opc': 'EVULN', 'title': 'Unlearn an event in learn mode', 'format': 'NN,EnHigh_EnLow', 'minpri': 3, 'comment': 'Sent by a configuration tool to remove an event from a node. VLCB also return GRSP.'},
        # NVIndex is NV Index number
        '96':  {'opc': 'NVSET', 'title': 'Set a node variable', 'format': 'NN,NVIndex,NVVal', 'minpri': 3, 'comment': 'Sent by a configuration tool to set a node variable. NV# is the NV index number. Deprecated replaced by NVSETRD. VLCB also return GRSP.'},
        '97':  {'opc': 'NVANS', 'title': 'Response to a request for a node variable value', 'format': 'NN,NVIndex,NVVal', 'minpri': 3, 'comment': 'Sent by node in response to request. (NVRD)'},
        # Short events
        # DNHigh, DNLow = Lower two bytes define device number - considered same as a device address - full 4byte event is still sent
        # <Dat1> is the high byte of the node number; <Dat2> is the low byte of the node number; <Dat3> is the high byte of the Device Number; <Dat4> is the low byte of the Device Number
        '98':  {'opc': 'ASON', 'title': 'Accessory short ON', 'format': 'NN,DNHigh_DNLow', 'minpri': 3, 'comment': 'Indicates an ‘ON’ event using the short event number of 2 LS bytes.'},
        '99':  {'opc': 'ASOF', 'title': 'Accessory short OFF', 'format': 'NN,DNHigh_DNLow', 'minpri': 3, 'comment': 'Indicates an ‘OFF’ event using the short event number of 2 LS bytes.'},
        '9A':  {'opc': 'ASRQ', 'title': 'Accessory Short Request Event', 'format': 'NN,DNHigh_DNLow', 'minpri': 3, 'comment': 'Indicates a ‘request’ event using the short event number of 2 LS bytes. A request event is used to elicit a response from a producer ‘device’ when it is required to know the ‘state’ of the device without producing an ON or OFF event and to trigger an event from a combi node.'},
        # ParaIndex = Index of parameter
        # ParaVal = Parameter value
        '9B':  {'opc': 'PARAN', 'title': 'Response to request for individual node parameter', 'format': 'NN,ParaIndex,ParaVal', 'minpri': 3, 'comment': 'NN is the node number of the sending node. Para# is the index of the parameter and Para val is the parameter value.'},
        # EnIndex is event index
        # EvIndex is Event variable index
        '9C':  {'opc': 'REVAL', 'title': 'Request for read of an event variable', 'format': 'NN,EnIndex,EvIndex', 'minpri': 3, 'comment': 'This request differs from B2 (REQEV) as it doesn’t need to be in learn mode but does; require the knowledge of the event index to which the EV request is directed.; EN# is the event index. EV# is the event variable index. Response is B5 (NEVAL)'},
        '9D':  {'opc': 'ARSON', 'title': 'Accessory short response event', 'format': 'NN,DNHigh_DNLow', 'minpri': 3, 'comment': 'Indicates an ‘ON’ response event. A response event is a reply to a status request (ASRQ) without producing an ON or OFF event.'},
        '9E':  {'opc': 'ARSOF', 'title': 'Accessory short response event', 'format': 'NN,DNHigh_DNLow', 'minpri': 3, 'comment': 'ndicates an ‘OFF’ response event. A response event is a reply to a status request (ASRQ) without producing an ON or OFF event.'},
        '9F':  {'opc': 'EXTC3', 'title': 'Extended op-code with 3 additional bytes', 'format': 'ExtOpc,Byte1,Byte2,Byte3', 'minpri': 0, 'comment': 'Used if the basic set of 32 OPCs is not enough. Allows an additional 256 OPCs'},
        # 5 data byte packets
        # Rep = repeat
        'A0':  {'opc': 'RDCC4', 'title': 'Request 4-byte DCC packet', 'format': 'Rep,Byte1,Byte2,Byte3,Byte4', 'minpri': 2, 'comment': '<Dat1(REP)> is number of repetitions in sending the packet.; <Dat2>..<Dat5> 4 bytes of the DCC packet.; Allows a CAB or equivalent to request a 4 byte DCC packet to be sent to the track. The; packet is sent <REP> times and is not refreshed on a regular basis.'},
        # CVHigh MSB of CV (1-65536)
        # CVLow LSB of CV
        # Mode - service write mode
        #CVVal - CV value
        'A2':  {'opc': 'WCVS', 'title': 'Write CV in Service Mode', 'format': 'Session,CVHigh_CVLow,Mode,CVVal', 'minpri': 0, 'comment': '<Dat1> is the session number of the cab; <Dat2> is the MSB # of the CV to be written (supports CVs 1 - 65536); <Dat3> is the LSB # of the CV to be written; <Dat4> is the service write mode; <Dat5> is the CV value to be written; Sent to the command station to write a DCC CV in service mode.'},
        'AB':  {'opc': 'HEARTB', 'title': 'Heartbeat message from module', 'format': 'NN,Sequence,Status,StatusBits', 'minpri': 0, 'comment': 'Hearbeat message from module indicating alive. Sent every 5 seconds by module. Sequence count from 0, incrementing and wrap around to 0., Statis is binary representation of diagnostic status 0x00 is normal operation. StatusBits is reserved set to 0x00. VLCB new feature.'},
        'AC':  {'opc': 'SD', 'title': 'Service discovery response', 'format': 'NN,ServiceIndex,ServiceType,Version', 'minpri': 0, 'comment': 'Version of service supported response to RQSD with ServiceIndex = 0. First SD response is number of following SD responses. Also see ESD. VLCB new feature.'},
        'AF':  {'opc': 'GRSP', 'title': 'Generic response', 'format': 'NN,Opcode,ServiceType,Result', 'minpri': 0, 'comment': 'Generic response for a config change request. Result byte indicates ok for success or error code. CMDERR codes are supported. VLCB new feature.'},
        'B0':  {'opc': 'ACON1', 'title': 'Accessory ON', 'format': 'NN,EnHigh_EnLow,Byte1', 'minpri': 3, 'comment': '<Dat1> is the high byte of the node number; <Dat2> is the low byte of the node number; <Dat3> is the high byte of the event number; <Dat4> is the low byte of the event number; <Dat5> is an additional data byte; Indicates an ‘ON’ event using the full event number of; 4 bytes with one additional data byte.'},
        'B1':  {'opc': 'ACOF1', 'title': 'Accessory OFF', 'format': 'NN,EnHigh_EnLow,Byte1', 'minpri': 3, 'comment': '<Dat1> is the high byte of the node number; <Dat2> is the low byte of the node number; <Dat3> is the high byte of the event number; <Dat4> is the low byte of the event number; <Dat5> is an additional data byte; Indicates an ‘OFF’ event using the full event number of 4 bytes with one additional data byte.'},
        'B2':  {'opc': 'REQEV', 'title': 'Read event variable in learn mode', 'format': 'NN,EnHigh_EnLow,EvIndex', 'minpri': 3, 'comment': 'Allows a configuration tool to read stored event variables from a node. EV# is the EV index. Reply is (EVANS)'},
        'B3':  {'opc': 'ARON1', 'title': 'Accessory Response Event', 'format': 'NN,EnHigh_EnLow,Byte1', 'minpri': 3, 'comment': 'Indicates an ‘ON’ response event with one additional data byte. A response event is a reply to a status request (AREQ) without producing an ON or OFF event.'},
        'B4':  {'opc': 'AROF1', 'title': 'Accessory Response Event', 'format': 'NN,EnHigh_EnLow,Byte1', 'minpri': 3, 'comment': 'Indicates an ‘OFF’ response event with one additional data byte. A response event is a reply to a status request (AREQ) without producing an ON or OFF event.'},
        # NNHigh, NNLow node replying
        # EvVal is value of Event Variable
        'B5':  {'opc': 'NEVAL', 'title': 'Response to request for read of EV value', 'format': 'NN,EnIndex,EvIndex,EvVal', 'minpri': 3, 'comment': 'NN is the node replying. EN# is the index of the event in that node. EV# is the index of the event variable. EVval is the value of that EV. This is response to 9C (REVAL)'},
        'B6':  {'opc': 'PNN', 'title': 'Response to Query Node', 'format': 'NN,ManufId,ModId,Flags', 'minpri': 3, 'comment': '<NN Hi> is the high byte of the node number; <NN Lo> is the low byte of the node number; <Manuf Id> is the Manufacturer id as defined in the node parameters; <Module Id> is the Module Type Id id as defined in the node parameters; <Flags> is the node flags as defined in the node parameters. The Flags byte contains bit flags as follows:; Bit 0: Set to 1 for consumer node; Bit 1: Set to 1 for producer node; Bit 2: Set to 1 for FLiM mode; Bit 3: Set to 1 for Bootloader compatible; If a module is both a producer and a consumer then it is referred to as a combi node and; both flags will be set.; Every node should send this message in response to a QNN message.'},
        'B8':  {'opc': 'ASON1', 'title': 'Accessory Short ON', 'format': 'NN,DNHigh_DNLow,Byte1', 'minpri': 3, 'comment': 'Indicates an ‘ON’ event using the short event number of 2 LS bytes with one added data byte.'},
        'B9':  {'opc': 'ASOF1', 'title': 'Accessory Short OFF', 'format': 'NN,DNHigh_DNLow,Byte1', 'minpri': 3, 'comment': 'Indicates an ‘OFF’ event using the short event number of 2 LS bytes with one added data byte.'},
        'BD':  {'opc': 'ARSON1', 'title': 'Accessory Short Response Event with one data byte', 'format': 'NN,DNHigh_DNLow,Byte1', 'minpri': 3, 'comment': 'Indicates an ‘ON’ response event with one added data byte. A response event is a reply to a status request (ASRQ) without producing an ON or OFF event.'},
        'BE':  {'opc': 'ARSOF1', 'title': 'Accessory short response event with one data byte', 'format': 'NN,DNHigh_DNLow,Byte1', 'minpri': 3, 'comment': 'Indicates an ‘OFF’ response event with one added data byte. A response event is a reply to a status request (ASRQ) without producing an ON or OFF event.'},
        'BF':  {'opc': 'EXTC4', 'title': 'Extended op-code with 4 data bytes', 'format': 'ExtOpc,Byte1,Byte2,Byte3,Byte4', 'minpri': 3, 'comment': 'Used if the basic set of 32 OPCs is not enough. Allows an additional 256 OPCs.'},
        # 6 data byte packets
        'C0':  {'opc': 'RDCC5', 'title': 'Requst 5-byte DCC packet', 'format': 'Rep,Byte1,Byte2,Byte3,Byte4,Byte5', 'minpri': 2, 'comment': '<Dat1(REP)> is # of repetitions in sending the packet.; <Dat2>..<Dat6> 5 bytes of the DCC packet.; Allows a CAB or equivalent to request a 5 byte DCC packet to be sent to the track. The packet is sent <REP> times and is not refreshed on a regular basis.'},
        'C1':  {'opc': 'WCVOA', 'title': 'Write CV (byte) in OPS mode by address', 'format': 'AddrHigh_AddrLow,CVHigh_CVLow,Mode,CVVal', 'minpri': 2, 'comment': '<Dat1> and <Dat2> are [AddrH] and [AddrL] of the decoder, respectively.; 7 bit addresses have (AddrH=0).; 14 bit addresses have bits 7,8 of AddrH set to 1.; <Dat3> is the MSB # of the CV to be written (supports CVs 1 - 65536); <Dat4> is the LSB # of the CV to be written; <Dat5> is the programming mode to be used; <Dat6> is the CV byte value to be written; Sent to the command station to write a DCC CV byte in OPS mode to specific loco (on the main). Used by computer based ops mode programmer that does not have a valid throttle handle.'},
        'C2':  {'opc': 'CABDAT', 'title': 'Send data to DCC CAB which is controlling loco', 'format': 'AddrHigh_AddrLow,DataCode,Byte1,Byte2,Byte3', 'minpri': 1, 'comment': 'Send data to DCC CAB controlling particular loco. CABSIG data1 for aspect1, data2 for aspect2, data3 for speed. Defined in RFC0005.'},
        'C7':  {'opc': 'DGN', 'title': 'Dianostic data resonse', 'format': 'NN,ServiceIndex,DiagCode,DiagVal', 'minpri': 0, 'comment': 'Diagnostic data value from a module sent in response to RDGN. VLCB new features'},
        'CF':  {'opc': 'FCLK', 'title': 'Fast Clock', 'format': 'DateTime', 'minpri': 3, 'comment': 'This addendum defines a time encoding'},
        'D0':  {'opc': 'ACON2', 'title': 'Accessory ON', 'format': 'NN,EnHigh_EnLow,Byte1,Byte2', 'minpri': 3, 'comment': 'Indicates an ‘ON’ event using the full event number of 4 bytes with two additional data bytes.'},
        'D1':  {'opc': 'ACOF2', 'title': 'Accessory OFF', 'format': 'NN,EnHigh_EnLow,Byte1,Byte2', 'minpri': 3, 'comment': 'ndicates an ‘OFF’ event using the full event number of 4 bytes with two additional data bytes.'},
        'D2':  {'opc': 'EVLRN', 'title': 'Teach an event in learn mode', 'format': 'NN,EnHigh_EnLow,EvIndex,EvVal', 'minpri': 3, 'comment': 'A node response to a request from a configuration tool for the EVs associated with an event (REQEV). For multiple EVs, there will be one response per request. VLCB also return GRSP.'},
        'D3':  {'opc': 'EVANS', 'title': 'Response to a request for an EV value in a node in learn mode', 'format': 'NN,EnHigh_EnLow,EvIndex,EvVal', 'minpri': 3, 'comment': 'A node response to a request from a configuration tool for the EVs associated with an event (REQEV). For multiple EVs, there will be one response per request.'},
        'D4':  {'opc': 'ARON2', 'title': 'Accessory Response Event', 'format': 'NN,EnHigh_EnLow,Byte1,Byte2', 'minpri': 3, 'comment': 'Indicates an ‘ON’ response event with two added data bytes. A response event is a reply to a status request (AREQ) without producing an ON or OFF event.'},
        'D5':  {'opc': 'AROF2', 'title': 'Accessory Response Event', 'format': 'NN,EnHigh_EnLow,Byte1,Byte2', 'minpri': 3, 'comment': 'Indicates an ‘OFF’ response event with two added data bytes. A response event is a reply to a status request (AREQ) without producing an ON or OFF event.'},
        'D8':  {'opc': 'ASON2', 'title': 'Accessory Short ON', 'format': 'NN,DNHigh_DNLow,Byte1,Byte2', 'minpri': 3, 'comment': 'Indicates an ‘ON’ event using the short event number of 2 LS bytes with two added data bytes.'},
        'DD':  {'opc': 'ARSON2', 'title': 'Accessory Short Response Event with two bytes', 'format': 'NN,DNHigh_DNLow,Byte1,Byte2', 'minpri': 3, 'comment': 'Indicates an ‘ON’ response event with two added data bytes. A response event is a reply to a status request (ASRQ) without producing an ON or OFF event.'},
        'DE':  {'opc': 'ARSOF2', 'title': 'Accessory Short Response Event with two bytes','format': 'NN,DNHigh_DNLow,Byte1,Byte2', 'minpri': 3, 'comment': 'Indicates an ‘OFF’ response event with two added data bytes. A response event is a reply to a status request (ASRQ) without producing an ON or OFF event.'},
        'DF':  {'opc': 'EXTC5', 'title': 'Extended op-code with 5 data bytes', 'format': 'ExtOpc,Byte1,Byte2,Byte3,Byte4,Byte5', 'minpri': 3, 'comment': 'Used if the basic set of 32 OPCs is not enough. Allows an additional 256 OPCs'},
         # 7 data byte packets
        'E0':  {'opc': 'RDCC6', 'title': 'Request 6 byte DCC packet', 'format': 'Rep,Byte1,Byte2,Byte3,Byte4,Byte5', 'minpri': 2, 'comment': 'Allows a CAB or equivalent to request a 6 byte DCC packet to be sent to the track. The packet is sent <REP> times and is not refreshed on a regular basis.'},
        'E1':  {'opc': 'PLOC', 'title': 'Engine Report', 'format': 'Session,AddrHigh_AddrLow,SpeedDir,Fn1,Fn2,Fn3', 'minpri': 2, 'comment': '<Dat4> is the Speed/Direction value. Bit 7 is the direction bit and bits 0-6 are the speed value.; <Dat5> is the function byte F0 to F4; <Dat6> is the function byte F5 to F8; <Dat7> is the function byte F9 to F12; A report of an engine entry sent by the command station. Sent in response to QLOC or as an acknowledgement of acquiring an engine requested by a cab (RLOC or GLOC).'},
        'E2':  {'opc': 'NAME', 'title': 'Response to request for node name string', 'format': 'Char1_7', 'minpri': 3, 'comment': 'A node response while in ‘setup’ mode for its name string. Reply to (RQMN). The string for the module type is returned in char1 to char7, space filled to 7 bytes. The Module Name prefix , currently either CAN or ETH, depends on the Interface Protocol parameter, it is not included in the response, see section 12.2 for the definition of the parameters.'},
        'E3':  {'opc': 'STAT', 'title': 'Command station status report', 'format': 'NN,CSNum,Flags,MajRev,MinRev,Build', 'minpri': 2, 'comment': '<NN hi> <NN lo> Gives node id of command station, so further info can be got from parameters or interrogating NVs; <CS num> For future expansion - set to zero at present; <flags> Flags as defined below; <Major rev> Major revision number; <Minor rev> Minor revision letter; <Build no.> Build number, always 0 for a released version.; <flags> is status defined by the bits below.; bits:; 0 - Hardware Error (self test); 1 - Track Error; 2 - Track On/ Off; 3 - Bus On/ Halted; 4 - EM. Stop all performed; 5 - Reset done; 6 - Service mode (programming) On/ Off; 7 – reserved; Sent by the command station in response to RSTAT.'},
        'E7':  {'opc': 'ESD', 'title': 'Extended service discovery response', 'format': 'NN,ServiceIndex,ServiceType,Byte1,Byte2,Byte3', 'minpri':0, 'comment': 'Detailed information about a service supported by a module. Sent in response to RQSD where ServiceIndex is not 0. VLCB new feature'},
        'E9':  {'opc': 'DTXC', 'title': 'Streaming protocol', 'format': 'StreamID,Sequence,Byte1,Byte2,Byte3,Byte4,Byte5', 'minpri': 0, 'comment': 'Used to transport relatively large block of data. StreamID is unique layout wide (> 20). If Sequence num is 0x00 then bytes are MessageLen (2 bytes), CRC16 (2 bytes), Flags (1 byte reserved). Defined in RFC0005'},
        'EF':  {'opc': 'PARAMS', 'title': 'Response to request for node parameters', 'format': 'Para1_7', 'minpri': 3, 'comment': 'A node response while in ‘setup’ mode for its parameter string. Reply to (RQNP)'},
        'F0':  {'opc': 'ACON3', 'title': 'Accessory ON', 'format': 'NN,EnHigh_EnLow,Byte1,Byte2,Byte3', 'minpri': 3, 'comment': 'Indicates an ON event using the full event number of 4 bytes with three additional data bytes.'},
        'F1':  {'opc': 'ACOF3', 'title': 'Accessory OFF', 'format': 'NN,EnHigh_EnLow,Byte1,Byte2,Byte3', 'minpri': 3, 'comment': 'Indicates an OFF event using the full event number of 4 bytes with three additional data bytes.'},
        'F2':  {'opc': 'ENRSP', 'title': 'Response to request to read node events', 'format': 'NN,En3_0,EnIndex', 'minpri': 3, 'comment': 'Where the NN is that of the sending node. EN3 to EN0 are the four bytes of the stored event. EN# is the index of the event within the sending node. This is a response to either 57 (NERD) or 72 (NENRD)'},
        'F3':  {'opc': 'ARON3', 'title': 'Acessory Response Event', 'format': 'NN,EnHigh_EnLow,Byte1,Byte2,Byte3', 'minpri': 3, 'comment': 'Indicates an ‘ON’ response event with three added data bytes. A response event is a reply to a status request (AREQ) without producing an ON or OFF event.'},
        'F4':  {'opc': 'AROF3', 'title': 'Acessory Response Event', 'format': 'NN,EnHigh_EnLow,Byte1,Byte2,Byte3', 'minpri': 3, 'comment': 'Indicates an ‘ON’ response event with three added data bytes. A response event is a reply to a status request (AREQ) without producing an ON or OFF event.'},
        'F5':  {'opc': 'EVLRNI', 'title': 'Teach and event in learn mode using event indexing', 'format': 'NN,EnHigh_EnLow,EnIndex,EvIndex,EvVal', 'minpri': 3, 'comment': 'Sent by a configuration tool to a node in learn mode to teach it an event. The event index must be known. Also teaches it the associated event variables.(EVs). This command is repeated for each EV required. VLCB allow zero events and zero EVid, also return GRSP.'},
        'F6':  {'opc': 'ACDAT', 'title': 'Accessory node data event', 'format': 'NN,Byte1,Byte2,Byte3,Byte4,Byte5', 'minpri': 3, 'comment': 'Indicates an event from this node with 5 bytes of data. For example, this can be used to send the 40 bits of an RFID tag. There is no event number in order to allow space for 5 bytes of data in the packet, so there can only be one data event per node.'},
        'F7':  {'opc': 'ARDAT', 'title': 'Accessory node data response', 'format': 'NN,Byte1,Byte2,Byte3,Byte4,Byte5', 'minpri': 3, 'comment': 'Indicates a node data response. A response event is a reply to a status request (RQDAT) without producing a new data event.'},
        'F8':  {'opc': 'ASON3', 'title': 'Accessory Short ON', 'format': 'NN,DNHigh_DNLow,Byte1,Byte2,Byte3', 'minpri': 3, 'comment': 'Indicates an ON event using the short event number of 2 LS bytes with three added data bytes.'},
        'F9':  {'opc': 'ASOF3', 'title': 'Accessory Short OFF', 'format': 'NN,DNHigh_DNLow,Byte1,Byte2,Byte3', 'minpri': 3, 'comment': 'Indicates an OFF event using the short event number of 2 LS bytes with three added data bytes.'},
        'FA':  {'opc': 'DDES', 'title': 'Device data event (short mode)', 'format': 'DNHigh_DNLow,Byte1,Byte2,Byte3,Byte4,Byte5', 'minpri': 3, 'comment': 'Function is the same as F6 but uses device addressing so can relate data to a device attached to a node. e.g. one of several RFID readers attached to a single node.'},
        'FB':  {'opc': 'DDRS', 'title': 'Device data response (short mode)', 'format': 'DNHigh_DNLow,Byte1,Byte2,Byte3,Byte4,Byte5', 'minpri': 3, 'comment': 'The response to a request for data from a device. (0x5B)'},
        'FC':  {'opc': 'DDWS', 'title': 'Write data', 'format': 'DNHigh_DNLow,byte1,byte2,byte3,byte4,byte5', 'minpri': 0, 'comment': 'Used to write data to a device such as an RFID tag. For RC522 byte1 should be 0.'},
        'FD':  {'opc': 'ARSON3', 'title': 'Accessory Short Response Event', 'format': 'NN,DNHigh_DNLow,Byte1,Byte2,Byte3', 'minpri': 3, 'comment': "Indicates an ON response event with with three added data bytes. A response event is a reply to a status request (ASRQ) without producing an ON or OFF event."},
        'FE':  {'opc': 'ARSOF3', 'title': 'Accessory Short Response Event', 'format': 'NN,DNHigh_DNLow,Byte1,Byte2,Byte3', 'minpri': 3, 'comment': "Indicates an OFF response event with with three added data bytes. A response event is a reply to a status request (ASRQ) without producing an ON or OFF event."},
        'FF':  {'opc': 'EXTC6', 'title': 'Extended op-code with 6 data bytes', 'format': 'ExtOpc,Byte1,Byte2,Byte3,Byte4,Byte5,Byte6', 'minpri': 3, 'comment': 'Used if the basic set of 32 OPCs is not enough. Allows an additional 256 OPCs'}
        }

    # Number of chars (2 per byte) in each field and how to show to use (eg. hex / num / char)
    # Num vs Hex is just suggested formatting - stored as a number the same
    # In char then it's ascii - otherwise normally convert to a number and return that
    field_formats = {
            "Unknown": [2, "hex"],        # Generic if not known how to format
            "CAN_ID": [2, "num"],         # Canbus id
            "Session": [2, "hex"],        # Engine session number
            "AddrHigh_AddrLow": [4, "num"], # Address of dcoder (7 or 14 bit) - 14bit have bits 6,7 of AddrHigh set to 1
            "Consist": [2, "num"],        # Consist ID
            "Index": [2, "num"],          # Index of loco in consist
            "Status": [2, "hex"],     
            "NN": [4, "num"],    # Node number
            "AllocCode": [2, "num"],       # Specific allocation code
            "SpeedDir": [2, "hex"],
            "SpeedFlag": [2, "hex"],       # Speed flags DDD-DDDDD
            "Fnum": [2, "num"],            # Function number
            "Fn1": [2,"num"],              # Function number (multi byte)
            "Fn2": [2,"num"],              # Function number (multi byte)
            "Fn3": [2,"num"],              # Function number (multi byte)
            "DNHigh_DNLow": [4, "num"],    # Device number (aka Device address)
            "ExtOpc": [2, "char"],         # Extended op code
            "Byte1": [2, "hex"],           # Byte (eg extended)
            "Byte2": [2, "hex"],           # Byte (eg extended)
            "Byte3": [2, "hex"],           # Byte (eg extended)
            "Byte4": [2, "hex"],           # Byte (eg extended)
            "Byte5": [2, "hex"],           # Byte (eg extended)
            "Byte6": [2, "hex"],           # Byte (eg extended)
            "CVHigh_CVLow": [2, "num"],    # CV number (DCC Configuration variables)
            "CVVal": [2, "num"],           # CV value 
            "NVIndex": [2, "num"],         # NV Node variable index
            "NVVal": [2, "hex"],           # NV value
            "NumEvents": [2, "num"],       # Number of events
            "ParaIndex": [2, "num"],       # Index of node parameter
            "ParaVal": [2, "num"],         # Value of node parameter
            "Para1_7": [14, "hex"],        # Multiple parameters
            "EnHigh_EnLow": [4, "num"],    # En
            "EnIndex": [2, "num"],         # Index of Event
            "EvIndex": [2, "num"],         # Index of Event variable
            "EvVal": [2, "num"],           # Event variable value
            "Rep": [2, "num"],             # Repeat, num of times to send signal
            "Mode": [2, "hex"],            # Service write mode
            "ManufId": [2, "hex"],         # Manufacturer
            "ModId": [2, "hex"],           # Module
            "Flags": [2, "hex"],           # Module flags
            "CSNum": [2, "hex"],           # Zero (future expansion)
            "MajRev": [2, "num"],          # Major revision number
            "MinRev": [2, "ascii"],        # Minor revision letter
            "Build": [2, "num"],           # Built number always 0 for a released version
            "DateTime": [8, "hex"],        # Date and time encoded
            "Char1_7": [14, "ascii"],      # Name string 7 bytes long - padded
            "En3_0": [8, "hex"],           # 4 bytes of stored event
            "EVSPC": [2, "num"],           # Amount of space available for events
            "ErrCode": [2, "hex"],         # Short 1 byte error code
            "Error": [4, "hex"],           # Error code
            "ModeCmd": [2, "num"],         # New VLCB mode command
            "ServiceIndex": [2, "hex"],    # New VLCB Index of services
            "DiagCode": [2, "hex"],        # New VLCB Diagnostic data code
            "DiagVal": [4, "hex"],         # New VLCB Diagnostic data value
            "Sequence": [2, "num"],        # New VLCB Sequence number (also RFC0005 SequenceNum)
            "StatusBits": [2, "hex"],      # New VLCB StatusBits
            "ServiceType": [2, "hex"],     # New VLCB Service Type
            "Version": [2, "num"],         # New VLCB Version of service definition
            "Opcode": [2, "hex"],          # New VLCB Requested Opcode (in response)
            "Result": [2, "hex"],          # New VLCB Result from GRSP
            "DataCode": [2, "hex"],        # Used in CABDAT meaning of next 3 bytes
            "StreamID": [2, "num"]         # StreamID must be unique for the layout
        }

    # Accessory Codes - provided for convenient lookup
    accessory_codes = {
        'on': [
            'ACON',
            'ASON',
            'ACON1',
            'ACON2',
            'ACON3',
            'ASON1',
            'ASON2',
            'ASON3'
            ],
        'off': [
            'ACOF',
            'ASOF',
            'ACOF1',
            'ACOF2',
            'ACOF3',
            'ASOF1',
            'ASOF2',
            'ASOF3'
            ]
        }

    # DCC error codes as byte string lookup
    dcc_error_codes = {
        '01': 'Loco stack full',
        '02': 'Loco address taken',
        '03': 'Session not present',
        '04': 'Consist empty',
        '05': 'Loco not found',
        '06': 'CAN bus error',
        '07': 'Invalid request',
        '08': 'Session cancelled'
        }

    # CMDERR / GRSP error codes as byte string lookup
    grsp_error_codes = {
        '00': 'OK',
        '01': 'Invalid command',
        '02': 'Not in learn mode',
        '03': 'Not in setup mode',
        '04': 'Too many events',
        '05': 'No event',
        '06': 'Invalid event variable index',
        '07': 'Invalid event',
        '08': 'Reserved',
        '09': 'Invalid parameter index',
        '0A': 'Invalid node variable index',
        '0B': 'Invalid event variable value',
        '0C': 'Invalid node variable value',
        '0D': 'Other in learn mode',
        'FA': 'Invalid mode',
        'FB': 'Invalid command parameter',
        'FC': 'Invalid service',
        'FD': 'Invalid diagnostic',
        'FE': 'Unknown NVM type',
        'FF': 'Reserved'
        }



    # Shorten op-code (remove extra characters)
    # Used to allow methods to be used if mnemonic is included in op-code
    # Or you could pass the entire data section
    @staticmethod
    def opcode_extract (opcode_string: str) -> str:
        """Shortens op code by removing extra characters

        Returns:
            String: shortened opcode string

        Raises:
            ValueError: If string does not contain at least 2 characters
        """
        if len(opcode_string) >= 2:
            return opcode_string[0:2]
        else:
            raise ValueError(f"String {opcode_string} is too short.")

    # Get min priority from opcode
    @staticmethod
    def opcode_priority (opcode: str) -> int:
        """Get priority from opcode

        Returns:
            int: priority value

        Raises:
            ValueError: If opcode not found
        """
        opcode = VLCBOpcode.opcode_extract(opcode)
        if opcode in VLCBOpcode.opcodes.keys():
            return VLCBOpcode.opcodes[opcode]['minpri']
        else:
            raise ValueError(f"Opcode {opcode} is not defined.")

    # Title of opcode (used in tooltip)
    @staticmethod
    def opcode_title (opcode: str) -> str:
        """Get title from opcode

        Returns:
            String: Opcode Title

        Raises:
            ValueError: If opcode not found
        """
        opcode = VLCBOpcode.opcode_extract(opcode)
        if opcode in VLCBOpcode.opcodes.keys():
            return VLCBOpcode.opcodes[opcode]['title']
        else:
            raise ValueError(f"Opcode {opcode} is not defined.")

    # Convert op-code to mnemonic
    @staticmethod
    def opcode_mnemonic (opcode):
        """Get mnemonic from opcode

        Returns:
            String: Opcode mnemonic

        Raises:
            ValueError: If opcode not found
        """
        opcode = VLCBOpcode.opcode_extract(opcode)
        if opcode in VLCBOpcode.opcodes.keys():
            return VLCBOpcode.opcodes[opcode]['opc']
        else:
            raise ValueError(f"Opcode {opcode} is not defined.")

    # Parse the data based on the format str and store in a dictionary
    @staticmethod
    def parse_data (data: str) -> OpcodeData:
        """Returns the data associated with the data string as a dict

        Returns:
            OpcodeData: Dict in OpcodeData format

        Raises:
            ValueError: If opcode not found
        """
        # Does not raise any explicit exceptions but uses methods that could raise a ValueError
        opcode = VLCBOpcode.opcode_extract(data)
        # strip opcode from data
        data = data[2:]

        #print (f"Data {data}")
        # Include opcode in data if required for future use
        data_parsed = {'opid': opcode}
        # check valid opcode (if not then empty format)
        if not opcode in VLCBOpcode.opcodes.keys():
            format = ""
            data_parsed['opcode'] = "UNKNOWN"
        else:
            format = VLCBOpcode.opcodes[opcode]['format']
            data_parsed['opcode'] = VLCBOpcode.opcodes[opcode]['opc']
        format_fields = format.split(',')
        # Remove data as added to dict so when reach 0 we are complete
        # parse each of the fields
        for this_field in format_fields:
            # If no format then skip and add any data at end
            if this_field == "":
                break
            # If unknown then flag here - should only get this during unittests if a new format is added
            if this_field not in VLCBOpcode.field_formats.keys():
                logging.warning (f"Warning format field {this_field} not recognised")
                this_field = "Unknown"
            # Take number of bytes from remaining data
            # Check enough first - if not then add warning
            num_chars = VLCBOpcode.field_formats[this_field][0] 
            if len(data) < num_chars :
                data_parsed[this_field] = f"Insufficient data {data}"
                # remove remaining
                data = ""
            else:
                # Move number of chars (typically 2 chars per byte) into this_val
                this_val = data[0:num_chars]
                data = data[num_chars:]
                # if number expected then convert
                if VLCBOpcode.field_formats[this_field][1] != "char":
                    this_val = int(this_val, 16)
                data_parsed[this_field] = this_val
        # remaining data added to final field (shouldn't normally get this)
        if len(data) > 0:
            data_parsed["ExtraData"] = data
        return data_parsed

opcode_extract(opcode_string) staticmethod

Shortens op code by removing extra characters

Returns:

Name Type Description
String str

shortened opcode string

Raises:

Type Description
ValueError

If string does not contain at least 2 characters

Source code in src/pyvlcb/vlcbformat.py
459
460
461
462
463
464
465
466
467
468
469
470
471
472
@staticmethod
def opcode_extract (opcode_string: str) -> str:
    """Shortens op code by removing extra characters

    Returns:
        String: shortened opcode string

    Raises:
        ValueError: If string does not contain at least 2 characters
    """
    if len(opcode_string) >= 2:
        return opcode_string[0:2]
    else:
        raise ValueError(f"String {opcode_string} is too short.")

opcode_mnemonic(opcode) staticmethod

Get mnemonic from opcode

Returns:

Name Type Description
String

Opcode mnemonic

Raises:

Type Description
ValueError

If opcode not found

Source code in src/pyvlcb/vlcbformat.py
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
@staticmethod
def opcode_mnemonic (opcode):
    """Get mnemonic from opcode

    Returns:
        String: Opcode mnemonic

    Raises:
        ValueError: If opcode not found
    """
    opcode = VLCBOpcode.opcode_extract(opcode)
    if opcode in VLCBOpcode.opcodes.keys():
        return VLCBOpcode.opcodes[opcode]['opc']
    else:
        raise ValueError(f"Opcode {opcode} is not defined.")

opcode_priority(opcode) staticmethod

Get priority from opcode

Returns:

Name Type Description
int int

priority value

Raises:

Type Description
ValueError

If opcode not found

Source code in src/pyvlcb/vlcbformat.py
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
@staticmethod
def opcode_priority (opcode: str) -> int:
    """Get priority from opcode

    Returns:
        int: priority value

    Raises:
        ValueError: If opcode not found
    """
    opcode = VLCBOpcode.opcode_extract(opcode)
    if opcode in VLCBOpcode.opcodes.keys():
        return VLCBOpcode.opcodes[opcode]['minpri']
    else:
        raise ValueError(f"Opcode {opcode} is not defined.")

opcode_title(opcode) staticmethod

Get title from opcode

Returns:

Name Type Description
String str

Opcode Title

Raises:

Type Description
ValueError

If opcode not found

Source code in src/pyvlcb/vlcbformat.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
@staticmethod
def opcode_title (opcode: str) -> str:
    """Get title from opcode

    Returns:
        String: Opcode Title

    Raises:
        ValueError: If opcode not found
    """
    opcode = VLCBOpcode.opcode_extract(opcode)
    if opcode in VLCBOpcode.opcodes.keys():
        return VLCBOpcode.opcodes[opcode]['title']
    else:
        raise ValueError(f"Opcode {opcode} is not defined.")

parse_data(data) staticmethod

Returns the data associated with the data string as a dict

Returns:

Name Type Description
OpcodeData OpcodeData

Dict in OpcodeData format

Raises:

Type Description
ValueError

If opcode not found

Source code in src/pyvlcb/vlcbformat.py
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
@staticmethod
def parse_data (data: str) -> OpcodeData:
    """Returns the data associated with the data string as a dict

    Returns:
        OpcodeData: Dict in OpcodeData format

    Raises:
        ValueError: If opcode not found
    """
    # Does not raise any explicit exceptions but uses methods that could raise a ValueError
    opcode = VLCBOpcode.opcode_extract(data)
    # strip opcode from data
    data = data[2:]

    #print (f"Data {data}")
    # Include opcode in data if required for future use
    data_parsed = {'opid': opcode}
    # check valid opcode (if not then empty format)
    if not opcode in VLCBOpcode.opcodes.keys():
        format = ""
        data_parsed['opcode'] = "UNKNOWN"
    else:
        format = VLCBOpcode.opcodes[opcode]['format']
        data_parsed['opcode'] = VLCBOpcode.opcodes[opcode]['opc']
    format_fields = format.split(',')
    # Remove data as added to dict so when reach 0 we are complete
    # parse each of the fields
    for this_field in format_fields:
        # If no format then skip and add any data at end
        if this_field == "":
            break
        # If unknown then flag here - should only get this during unittests if a new format is added
        if this_field not in VLCBOpcode.field_formats.keys():
            logging.warning (f"Warning format field {this_field} not recognised")
            this_field = "Unknown"
        # Take number of bytes from remaining data
        # Check enough first - if not then add warning
        num_chars = VLCBOpcode.field_formats[this_field][0] 
        if len(data) < num_chars :
            data_parsed[this_field] = f"Insufficient data {data}"
            # remove remaining
            data = ""
        else:
            # Move number of chars (typically 2 chars per byte) into this_val
            this_val = data[0:num_chars]
            data = data[num_chars:]
            # if number expected then convert
            if VLCBOpcode.field_formats[this_field][1] != "char":
                this_val = int(this_val, 16)
            data_parsed[this_field] = this_val
    # remaining data added to final field (shouldn't normally get this)
    if len(data) > 0:
        data_parsed["ExtraData"] = data
    return data_parsed

Utils for use by other parts of the library

bytes_to_addr(byte1, byte2)

Convert 2 byte values into an address id sring

Parameters:

Name Type Description Default
byte1 bytes

Most significant byte

required
byte2 bytes

Least significant byte

required

Returns:

Name Type Description
Int int

The address id value

Source code in src/pyvlcb/utils.py
128
129
130
131
132
133
134
135
136
137
138
139
140
def bytes_to_addr (byte1: bytes, byte2: bytes) -> int:
    """Convert 2 byte values into an address id sring

    Args:
        byte1 (bytes): Most significant byte
        byte2 (bytes): Least significant byte

    Returns:
        Int: The address id value
    """
    msb = int(byte1)
    lsb = int(byte2)
    return ((msb << 8) + lsb)

bytes_to_functions(fn1, fn2, fn3)

Sets the value of the functions from a PLOC message Only returns values for F0 to F12 (others not included in PLOC) Provides as a list for inclusion in menus etc.

Parameters:

Name Type Description Default
fn1 bytes

Function byte F0 to F4 - bit 5 = dir lighting (F0), bit 6 = direction, bit 7 = res, bit 8 = 0

required
fn2 bytes

Function byte F5 to F8 - bit 5 upwards reserved

required
fn3 bytes

Function byte F9 to F12

required

Returns:

Type Description
List[int]

List of function values as 0 or 1 for each function as off and on

Source code in src/pyvlcb/utils.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def bytes_to_functions (fn1: bytes, fn2: bytes, fn3: bytes) -> List[int]:
    """ Sets the value of the functions from a PLOC message
    Only returns values for F0 to F12 (others not included in PLOC)
    Provides as a list for inclusion in menus etc.

    Args:
        fn1: Function byte F0 to F4 - bit 5 = dir lighting (F0), bit 6 = direction, bit 7 = res, bit 8 = 0
        fn2: Function byte F5 to F8 - bit 5 upwards reserved
        fn3: Function byte F9 to F12

    Returns:
        List of function values as 0 or 1 for each function as off and on

    """    
    data_in = [fn1, fn2, fn3]
    mask = [0b0001, 0b0010, 0b0100, 0b1000]
    function_status = [0] * 29
    # Handle 0 separately as it's in the upper nibble
    function_status[0] = data_in[0] & 0b10000
    # Create a list of 12 entries
    for i in range (0, 3):
        for j in range (0, 4):
            function_status[(i*4)+j+1] = 1 if (data_in[i] & mask[j]) > 0 else 0

    return function_status

bytes_to_hexstr(byte1, byte2)

Convert 2 bytes to a hex string

Parameters:

Name Type Description Default
byte1 bytes

Most significant byte

required
byte2 bytes

Least significant byte

required

Returns:

Name Type Description
String str

A hex representation of the number

Source code in src/pyvlcb/utils.py
143
144
145
146
147
148
149
150
151
152
153
def bytes_to_hexstr (byte1: bytes, byte2: bytes) -> str:
    """Convert 2 bytes to a hex string

    Args:
        byte1 (bytes): Most significant byte
        byte2 (bytes): Least significant byte

    Returns:
        String: A hex representation of the number
    """
    return f"{hex(byte1).upper()[2:]:0>2}{hex(byte2).upper()[2:]:0>2}"

dict_to_string(dictionary)

Convert a dict to a string without {} or quotes

Source code in src/pyvlcb/utils.py
43
44
45
46
47
48
49
50
def dict_to_string (dictionary: dict) -> str:
    """ Convert a dict to a string without {} or quotes """
    data_string = ""
    for key, value in dictionary.items():
        if data_string != "":
            data_string += " , "
        data_string += f"{key} = {value}"
    return data_string

f_to_bytes(f_num, function_status)

Convert a Fnumber (Loco function ID) to bytes as string representation

Can be used to create correct format for loco_set_dfun Limited to range 0 to 28 - which is max for DFUN Higher would need to use DFNON/DFOFF

Parameters:

Name Type Description Default
f_num int

Function number

required
function_status List[int]

List with value of all functions (must be updated before calling)

required

Returns:

Type Description
Tuple[str, str]

Tuple of two x 2-characer strings representing the bytes

Raises: ValueError

Source code in src/pyvlcb/utils.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def f_to_bytes (f_num: int, function_status: List[int]) -> Tuple[str, str]:
    """ Convert a Fnumber (Loco function ID) to bytes as string representation

    Can be used to create correct format for loco_set_dfun
    Limited to range 0 to 28 - which is max for DFUN
    Higher would need to use DFNON/DFOFF

    Args:
        f_num: Function number
        function_status: List with value of all functions (must be updated before calling)

    Returns:
        Tuple of two x 2-characer strings representing the bytes

    Raises: ValueError

    """
    # Must be an F number between 0 and 28
    if f_num < 0 or f_num > 28:
        raise ValueError (f"Fnumber needs to be between 0 and 28. Number provided {f_num}")

    # Extend function_status to 29 entries
    function_list = function_status + [0] * (29 - len(function_status))

    if f_num <= 4:
        byte1 = 1
        byte2 = (0b10000 * function_list[0] +  # fn0 is higher nibble (bit 5)
                 0b0001 * function_list[1] +
                 0b0010 * function_list[2] +
                 0b0100 * function_list[3] +
                 0b1000 * function_list[4]
                 )
    elif f_num <= 8:
        byte1 = 2
        byte2 = (0b0001 * function_list[5] +
                 0b0010 * function_list[6] +
                 0b0100 * function_list[7] +
                 0b1000 * function_list[8]
                 )
    elif f_num <= 12:
        byte1 = 3
        byte2 = (0b0001 * function_list[9] +
                 0b0010 * function_list[10] +
                 0b0100 * function_list[11] +
                 0b1000 * function_list[12]
                 )
    elif f_num <= 20:
        byte1 = 4
        byte2 = (0b0001 * function_list[13] +
                 0b0010 * function_list[14] +
                 0b0100 * function_list[15] +
                 0b1000 * function_list[16] +
                 0b10000 * function_list[17] +
                 0b100000 * function_list[18] +
                 0b1000000 * function_list[19] +
                 0b10000000 * function_list[20]
                 )
    elif f_num <= 28:
        byte1 = 5
        byte2 = (0b0001 * function_list[21] +
                 0b0010 * function_list[22] +
                 0b0100 * function_list[23] +
                 0b1000 * function_list[24] +
                 0b10000 * function_list[25] +
                 0b100000 * function_list[26] +
                 0b1000000 * function_list[27] +
                 0b10000000 * function_list[28]
                 )
    # convert to strings before returning
    # and with 0xFF guarentees it doesn't overflow (although not really neccessary for this method)
    str1 = f"{ (byte1 & 0xFF) :02x}"
    str2 = f"{ (byte2 & 0xFF) :02x}"
    return (str1, str2)

num_to_1hexstr(num)

Convert number to a byte

Parameters:

Name Type Description Default
num int

Number to convert

required

Returns:

Name Type Description
String str

A hex representation of the number (2 chars)

Source code in src/pyvlcb/utils.py
 6
 7
 8
 9
10
11
12
13
14
15
def num_to_1hexstr (num: int) -> str:
    """Convert number to a byte

    Args:
        num (int): Number to convert

    Returns:
        String: A hex representation of the number (2 chars)
    """
    return f"{hex(num).upper()[2:]:0>2}"

num_to_2hexstr(num)

Convert number to 2 bytes

Parameters:

Name Type Description Default
num int

Number to convert

required

Returns:

Name Type Description
String str

A hex representation of the number (4 chars)

Source code in src/pyvlcb/utils.py
18
19
20
21
22
23
24
25
26
27
def num_to_2hexstr (num: int) -> str:
    """Convert number to 2 bytes

    Args:
        num (int): Number to convert

    Returns:
        String: A hex representation of the number (4 chars)
    """
    return f"{hex(num).upper()[2:]:0>4}"

num_to_4hexstr(num)

Convert number to 4 bytes

Parameters:

Name Type Description Default
num int

Number to convert

required

Returns:

Name Type Description
String str

A hex representation of the number (8 chars)

Source code in src/pyvlcb/utils.py
30
31
32
33
34
35
36
37
38
39
def num_to_4hexstr (num: int) -> str:
    """Convert number to 4 bytes

    Args:
        num (int): Number to convert

    Returns:
        String: A hex representation of the number (8 chars)
    """
    return f"{hex(num).upper()[2:]:0>8}"