开发者

I/O intensive serial port application: Porting from Threading, Queue based design to Asynchronous (ala Twisted)

开发者 https://www.devze.com 2023-02-05 03:00 出处:网络
So, I\'ve been working on an application for a client that communicates with wireless devices via a Serial (RS-232) \"Master\". I\'ve currently written the core of the app using threading (below). I\'

So, I've been working on an application for a client that communicates with wireless devices via a Serial (RS-232) "Master". I've currently written the core of the app using threading (below). I've been noticing on #python that the consensus seems to be to NOT use threads and to use Twisted's asynchronous communication abilities.

I haven't been able to find any good examples of using twisted for serial port async I/O communication. However, I have found Dave Peticolas' 'Twisted Introduction' (thanks nosklo) that I'm currently working through, but, it uses sockets instead of serial communication (but the async concept is definitely very well explained).

How would I go about porting this app over to Twisted from using Threading, Queues? Are there any advantages/disadvantages (I have noticed that, on occasion, if a thread hangs it will BSOD the system)?

The Code (msg_poller.py)

from livedatafeed import LiveDataFeed
from msg_build import build_message_to_send
from utils import get_item_from_queue
from protocol_wrapper import ProtocolWrapper, ProtocolStatus
from crc16 import *
import time
import Queue
import threading
import serial
import gc

gc.enable()
PROTOCOL_HEADER = '\x01'
PROTOCOL_FOOTER = '\x0D\x0A'
PROTOCOL_DLE = '\x90'

INITIAL_MODBUS = 0xFFFF


class Poller:
    """
    Connects to the serial port and polls nodes for data.
    Reads response from node(s) and loads that data into queue.
    Parses qdata and writes that data to database.
    """

    def __init__(self,
            port,
            baudrate,
            parity,
            rtscts,
            xonxoff,
            echo=False):
        try:
            self.serial = serial.serial_for_url(port,
                    baudrate,
                    parity=parity,
                    rtscts=rtscts,
                    xonxoff=xonxoff,
                    timeout=.01)
        except AttributeError:
            self.serial = serial.Serial(port,
                    baudrate,
                    parity=parity,
                    rtscts=rtscts,
                    xonxoff=xonxoff,
                    timeout=.01)
            self.com_data_q = None
        self.com_error开发者_StackOverflow_q = None
        self.livefeed = LiveDataFeed()
        self.timer = time.time()
        self.dtr_state = True
        self.rts_state = True
        self.break_state = False

    def start(self):
        self.data_q = Queue.Queue()
        self.error_q = Queue.Queue()
        com_error = get_item_from_queue(self.error_q)
        if com_error is not None:
            print 'Error %s' % (com_error)
        self.timer = time.time()
        self.alive = True

        # start monitor thread
        #
        self.mon_thread = threading.Thread(target=self.reader)
        self.mon_thread.setDaemon(1)
        self.mon_thread.start()

        # start sending thread
        #
        self.trans_thread = threading.Thread(target=self.writer)
        self.trans_thread.setDaemon(1)
        self.trans_thread.start()

    def stop(self):
        try:
            self.alive = False
            self.serial.close()
        except (KeyboardInterrupt, SystemExit):
            self.alive = False

    def reader(self):
        """
        Reads data from the serial port using self.mon_thread.
        Displays that data on the screen.
        """
        from rmsg_format import message_crc, message_format
        while self.alive:
            try:
                while self.serial.inWaiting() != 0:

                # Read node data from the serial port. Data should be 96B.

                    data = self.serial.read(96)
                    data += self.serial.read(self.serial.inWaiting())

                    if len(data) > 0:

                        # Put data in to the data_q object
                        self.data_q.put(data)
                        if len(data) == 96:
                            msg = self.data_q.get()

                            pw = ProtocolWrapper(
                                        header=PROTOCOL_HEADER,
                                        footer=PROTOCOL_FOOTER,
                                        dle=PROTOCOL_DLE)
                            status = map(pw.input, msg)

                            if status[-1] == ProtocolStatus.IN_MSG:
                                # Feed all the bytes of 'msg' sequentially into pw.input

                                # Parse the received CRC into a 16-bit integer
                                rec_crc = message_crc.parse(msg[-4:]).crc

                                # Compute the CRC on the message
                                calc_crc = calcString(msg[:-4], INITIAL_MODBUS)
                                from datetime import datetime
                                ts = datetime.now().strftime('%Y/%m/%d %H:%M:%S')
                                if rec_crc != calc_crc:
                                    print ts
                                    print 'ERROR: CRC Mismatch'
                                    print msg.encode('hex')
                                else:
                                    #msg = message_format.parse(msg[1:])
                                    #print msg.encode('hex') + "\r\n"
                                    msg = message_format.parse(msg[1:])
                                    print msg
                                    #return msg
                                    gc.collect()
                    time.sleep(.2)
            except (KeyboardInterrupt, SystemExit, Exception, TypeError):
                self.alive = False
                self.serial.close()
                raise

    def writer(self):
        """
        Builds the packet to poll each node for data.
        Writes that data to the serial port using self.trans_thread
        """
        import time
        try:
            while self.alive:
                try:
                    dest_module_code = ['DRILLRIG',
                            'POWERPLANT',
                            'GENSET',
                            'MUDPUMP']
                    dest_ser_no = lambda x: x + 1
                    for code in dest_module_code:
                        if code != 'POWERPLANT':
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='%s' % (code),
                                    dest_ser_no=dest_ser_no(0),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                        elif code == 'POWERPLANT':
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='POWERPLANT',
                                    dest_ser_no=dest_ser_no(0),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='POWERPLANT',
                                    dest_ser_no=dest_ser_no(1),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                except (KeyboardInterrupt, SystemExit):
                    self.alive = False
                    self.serial.close()
                    raise
        except (KeyboardInterrupt, SystemExit):
            self.alive = False
            self.serial.close()
            raise


def main():
    poller = Poller(
            port='COM4',
            baudrate=115200,
            parity=serial.PARITY_NONE,
            rtscts=0,
            xonxoff=0,
            )
    poller.start()
    poller.reader()
    poller.writer()
    poller.stop()
if __name__ == '__main__':
    main()                                                                      


It is very difficult (if not impossible) to write a direct one-to-one mapping program between threading/queue approach and one that uses twisted.

I would suggest that, get a hang of twisted and its reactor way it's use of Protocol and the protocol specific methods. Think about it as as all the asynchronous things that you had been explicitly coding using threads and queues are given to you for free when you are using deferred using twisted.

twisted does seem to support SerialPort over it's reactor using SerialPort transport class and the basic structure seems to be somewhat like this.

from twisted.internet import reactor
from twisted.internet.serialport import SerialPort

SerialPort(YourProtocolClass(), Port, reactor, baudrate=baudrate))
reactor.run() 

In YourProtocolClass() would you handle the various events that are specific to your Serial Port Communication requirements. The doc/core/examples directory contains examples such as gpsfix.py and mouse.py.

0

精彩评论

暂无评论...
验证码 换一张
取 消