`
f002489
  • 浏览: 262882 次
  • 性别: Icon_minigender_1
  • 来自: 成都
社区版块
存档分类
最新评论

如何用python写个串口通信的程序?

阅读更多

http://www.douban.com/group/topic/7400483/

 

<script></script>

如何用python写个串口通信的程序?

如影随形

2009-07-26 16:08:18 来自: 如影随形(但行好事,莫问前程)

用tKinter做界面

  • 代码人

    2009-07-26 16:11:05 代码人 (敏而好学,杰而不傲)

    http://pyserial.sourceforge.net/


    使用pySerial模块

  • 如影随形

    2009-07-26 16:57:40 如影随形 (但行好事,莫问前程)

    没有代码啊,我找到过一个http://currentlife.blog.sohu.com/53741351.html,他里面上来就调用 serial,我在我的IDLE中 import serial print serial.__doc__,显示没有这个模块。

  • 如影随形

    2009-07-26 17:18:53 如影随形 (但行好事,莫问前程)

    搜狐博客 > 一二三事 > 日志 2007-07-05 | python 串口通信 标签: python 串口 Jeffu:

    我用的是“线程轮寻”方式。
    就是打开串口后,启动一个线程来监听串口数据的进入,有数据时,就做数据的处理(也可以发送一个事件,并携带接收到的数据)。
    我没有用到串口处理太深的东西。
    客户的原程序不能给你,不过我给你改一下吧。
    里面的一些东西,已经经过了处理,要运行,可能你要自己改一下,把没有用的东西去掉。
    我这里已经没有串口设备了,不能调了,你自己处理一下吧,不过基本的东西已经有了。
    =================================================================
    #coding=gb18030


    import sys,threading,time;
    import serial;
    import binascii,encodings;
    import re;
    import socket;


    class ReadThread:
    def __init__(self, Output=None, Port=0, Log=None, i_FirstMethod=True):
    self.l_serial = None;
    self.alive = False;
    self.waitEnd = None;
    self.bFirstMethod = i_FirstMethod;
    self.sendport = '';
    self.log = Log;
    self.output = Output;
    self.port = Port;
    self.re_num = None;


    def waiting(self):
    if not self.waitEnd is None:
    self.waitEnd.wait();


    def SetStopEvent(self):
    if not self.waitEnd is None:
    self.waitEnd.set();
    self.alive = False;
    self.stop();


    def start(self):
    self.l_serial = serial.Serial();
    self.l_serial.port = self.port;
    self.l_serial.baudrate = 9600;
    self.l_serial.timeout = 2;


    self.re_num = re.compile('\d');


    try:
    if not self.output is None:
    self.output.WriteText(u'打开通讯端口\r\n');
    if not self.log is None:
    self.log.info(u'打开通讯端口');
    self.l_serial.open();
    except Exception, ex:
    if self.l_serial.isOpen():
    self.l_serial.close();


    self.l_serial = None;


    if not self.output is None:
    self.output.WriteText(u'出错:\r\n %s\r\n' % ex);
    if not self.log is None:
    self.log.error(u'%s' % ex);
    return False;


    if self.l_serial.isOpen():
    if not self.output is None:
    self.output.WriteText(u'创建接收任务\r\n');
    if not self.log is None:
    self.log.info(u'创建接收任务');
    self.waitEnd = threading.Event();
    self.alive = True;
    self.thread_read = None;
    self.thread_read = threading.Thread(target=self.FirstReader);
    self.thread_read.setDaemon(1);
    self.thread_read.start();
    return True;
    else:
    if not self.output is None:
    self.output.WriteText(u'通讯端口未打开\r\n');
    if not self.log is None:
    self.log.info(u'通讯端口未打开');
    return False;


    def InitHead(self):
    #串口的其它的一些处理
    try:
    time.sleep(3);
    if not self.output is None:
    self.output.WriteText(u'数据接收任务开始连接网络\r\n');
    if not self.log is None:
    self.log.info(u'数据接收任务开始连接网络');
    self.l_serial.flushInput();
    self.l_serial.write('\x11');
    data1 = self.l_serial.read(1024);
    except ValueError,ex:
    if not self.output is None:
    self.output.WriteText(u'出错:\r\n %s\r\n' % ex);
    if not self.log is None:
    self.log.error(u'%s' % ex);
    self.SetStopEvent();
    return;


    if not self.output is None:
    self.output.WriteText(u'开始接收数据\r\n');
    if not self.log is None:
    self.log.info(u'开始接收数据');
    self.output.WriteText(u'===================================\r\n');


    def SendData(self, i_msg):
    lmsg = '';
    isOK = False;
    if isinstance(i_msg, unicode):
    lmsg = i_msg.encode('gb18030');
    else:
    lmsg = i_msg;
    try:
    #发送数据到相应的处理组件
    pass
    except Exception, ex:
    pass;
    return isOK;


    def FirstReader(self):
    data1 = '';
    isQuanJiao = True;
    isFirstMethod = True;
    isEnd = True;
    readCount = 0;
    saveCount = 0;
    RepPos = 0;
    #read Head Infor content
    self.InitHead();


    while self.alive:
    try:
    data = '';
    n = self.l_serial.inWaiting();
    if n:
    data = data + self.l_serial.read(n);
    #print binascii.b2a_hex(data),


    for l in xrange(len(data)):
    if ord(data[l])==0x8E:
    isQuanJiao = True;
    continue;
    if ord(data[l])==0x8F:
    isQuanJiao = False;
    continue;
    if ord(data[l]) == 0x80 or ord(data[l]) == 0x00:
    if len(data1)>10:
    if not self.re_num.search(data1,1) is None:
    saveCount = saveCount + 1;
    if RepPos==0:
    RepPos = self.output.GetInsertionPoint();
    self.output.Remove(RepPos,self.output.GetLastPosition());


    self.SendData(data1);
    data1 = '';
    continue;
    except Exception, ex:
    if not self.log is None:
    self.log.error(u'%s' % ex);


    self.waitEnd.set();
    self.alive = False;


    def stop(self):
    self.alive = False;
    self.thread_read.join();
    if self.l_serial.isOpen():
    self.l_serial.close();
    if not self.output is None:
    self.output.WriteText(u'关闭通迅端口:[%d] \r\n' % self.port);
    if not self.log is None:
    self.log.info(u'关闭通迅端口:[%d]' % self.port);


    def printHex(self, s):
    s1 = binascii.b2a_hex(s);
    print s1;


    #测试用部分
    if __name__ == '__main__':
    rt = ReadThread();
    f = open("sendport.cfg", "r")
    rt.sendport = f.read()
    f.close()
    try:
    if rt.start():
    rt.waiting();
    rt.stop();
    else:
    pass;
    except Exception,se:
    print str(se);


    if rt.alive:
    rt.stop();


    print 'End OK .';
    del rt;

  • 如影随形

    2009-07-26 17:20:45 如影随形 (但行好事,莫问前程)

    这是个Tkinter界面,能不能把他们两个做个拼盘,
    # -*- coding: utf-8 -*-
    from Tkinter import *
    import Tkinter
    from tkMessageBox import *

    class EntryDemo(Frame):

    def __init__(self,parent):

    Frame.__init__(self)
    self.pack(expand=YES,fill=BOTH)
    self.master.title("串口通信")

    self.frame1=Frame(self)
    self.frame1.pack(pady=5)
    self.text1=Entry(self.frame1,name="显示")
    self.text1.bind("<Return>",self.showContents)
    self.text1.pack(side=LEFT,padx=5)

    self.frame2=Frame(self)
    self.frame2.pack(pady=5)
    self.text2=Entry(self.frame2,name="显示")
    self.text2.bind("<Return>",self.showContents)
    self.text2.pack(side=LEFT,padx=5)

    self.sendButton = Button(self.frame1, text="发送", command = self.pressedPlain)
    self.sendButton.pack(side=LEFT, padx =5)

    def showContents(self, event):
    theName=event.widget.winfo_name()
    theContents=event.widget.get()
    showinfo("Message", theName +":"+ theContents)

    def pressedPlain(self):
    showinfo("Message","数据")


    def main():
    root = Tkinter.Tk()
    widget = EntryDemo(root)
    root.mainloop()

    if __name__=="__main__":
    main()

  • 如影随形

    2009-07-26 19:40:14 如影随形 (但行好事,莫问前程)

    我下载了pyserial,解压开有example, serials,把serial文件夹里的.py文件拷贝到python安装目录下的lib文件家里面吗?

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics