Archive for 多线程

最近看c底层相关的指令,操作,编码,多线程,磁盘文件读写相关的看的比较多。不经理永远不知道这里边的东西有多少,还有很多不易理解的。由于时间仓猝,本来想好好整理一下,作为一个总结,但还是决定只是记一个笔记,而且内容来自搜索引擎,有一部分不是看的官方文档解释等,可能不正确,而且没有写全。以后无聊的时候想起来再搞吧(感觉用不到以后不会再搞了) 先说为什么写这个文章,我碰到一个问题。多线程读写文件,然后单线程读取,线上后读取结果不一致。之前有过分步的测试,是没有问题的。 然后我排查原因,先从读入手,发现数据有问题,然后转到看写。写先通过单独打印几条日志,和对写入的数据错误判断,在某个函数。这时候,我打开编译的debug模式,就是能打印更详细的日志,然后发现问题没了,数据正常了。然后我就玩起了编译参数。发现打印日志就没事,不打印就出问题了,然后我就想到是编译器给我优化过头了,那时候还是开始02的优化。因为那个函数不涉及多线程的操作。然后我去掉 -DNDEBUG参数,然后用assert()判断出是哪一行出了问题。然后实验了几个方法,发现都是可以的,于是网上搜索资料,大体比较了一下这集中方式,选了一种。 1,把变量声明称volatile,主要功能是,编译器不会优化掉这个变量,然后变量的值不会存到寄存器,保证从内存中读取。看我的程序具体也看不出来,除非反汇编代码看看。也懒得看了。 2,__sync_synchronize (...),This builtin issues a full memory barrier. 内存栅栏,这个是个硬件栅栏,效率相比下边不太高。 Memory Barrier (Memory Fence)分为两种,一种软件的,只对编译器起作用,一种是硬件的,看文章说是总线信号控制的,这个计算机组成结构没学好,也懒得细看。 下面几种跟linux内核中的相对应,没看源码,不知道对不对 #define mb() __asm__ __volatile__("mfence":::"memory")这个跟__sync_synchronize一个作用,是硬件的栅栏。 #define rmb() __asm__ __volatile__("lfence":::"memory")不允许将barrier之前的内存读取指令移到barrier之后 (release barrier) #define wmb() __asm__ __volatile__("sfence":::"memory")不允许将barrier之后的内存读取指令移到barrier之前(acquire barrier) #define barrier() __asm__ __volatile__("":::"memory") barrier()是软栅栏, rmb wmb好像分单处理器和多处理器不一样,单处理器为软栅栏。 期间写的程序也有好多cas操作,atomic原子操作,编译器主要用的gcc的,然后直接用的gcc的,官方地址: https://gcc.gnu.org/onlinedocs/gcc/_005f_005fatomic-Builtins.html

Continue

真正使用多线程进行程序的编写,以前学习时的笔记《python threading模块学习join()》,现在用在实践上。 写代码在发现对他不熟悉的时候,总是缩手缩脚的,不知道为什么也不去尝试一下。过分依赖搜索引擎,自从上次写golang读取id3信息的那个程序开始,学会了用英文关键字检索信息,会检索到很多有用的信息,像stackoverflow上的回答。 先介绍一下功能:多线程生成密码,多线程提交数据。密码是排列组合生产的,五个元素全排列5!,这个密码长度是5。5个元素,密码长度为2,5×4  。适合那种通用型密码的破解。 程序代码:

# encoding:utf-8
#
#author:0x55aa
#team:Pax.Mac Team
#time:2012.08.17
#
import urllib2,urllib,time
import threading
from Queue import Queue
from BeautifulSoup import BeautifulSoup

#提交的地址
URL = ""
#密码列表example:['0x55aa','1','b']
passwdlist = ['0x55aa','123','a']
#生产密码长度
passwdlen = 3

#密码生成完毕
passwd_done = False
#队列为空啦,这个可以用Queue.empty()
#post_done = False
#锁,用于输出
lock = threading.Lock()
#tag标识是否找到密码退出
tag = False

#密码
def perm(items, n=None):
    if n is None:
        n = len(items)
    for i in range(len(items)):
        v = items[i:i+1]
        if n == 1:
            yield v
        else:
            rest = items[:i] + items[i+1:]
            for p in perm(rest, n-1):
                yield v + p

def sqlPost(password):
    """提交奥"""
    #提交字段的设置在这里。。
    post_params = [
        ('UserName', 'root'),
        ('Password', password),
        ]
    data = urllib.urlencode(post_params)
    req = urllib2.Request(URL)
    r = urllib2.urlopen(req,data)

    #这里加入返回数据的判断
    """
    soup = BeautifulSoup(r.read())
    list1 = soup.find('value')
    """
    return list1

class PasswordProduce(threading.Thread):
    """生成密码"""
    def __init__(self,queue,first,items,n):
        threading.Thread.__init__(self)
        self.passwd = queue
        self.first = first
        self.items = items
        self.n = n

    def run(self):
        #生成密码
        r = perm(self.items,self.n)
        for i in r:
            p = self.first + ''.join(i)
            #密码插入队列
            self.passwd.put(p)
            """
            if lock.acquire():
                print p
                lock.release()
            """

class Sqlpost(threading.Thread):
    """提交"""
    def __init__(self,queue):
        threading.Thread.__init__(self)
        self.passwd = queue

    def run(self):
        #post提交
        global passwd_done
        while not (passwd_done and self.passwd.empty()):
            """
            if lock.acquire():
                print self.getName(),passwd_done , self.passwd.empty()
                lock.release()
            """
            #取得密码
            if not self.passwd.empty():
                p = self.passwd.get()
            else:
                time.sleep(2)
                continue
            """
            if lock.acquire():
                print p
                lock.release()
            """
            #提交
            r = sqlPost(p)
            #打印返回信息
            global tag
            if r:
                tag = True
            if lock.acquire():
                print "password:",p,">>the result:",r
                #print r
                lock.release()
            if tag:
                break

def main():

    queue = Queue()
    #密码线程
    passwdthread = []
    #post
    postthread = []

    #线程数
    s = len(passwdlist)
    for i in range(s):
        first = passwdlist[i]
        lastlist = passwdlist[:i]+passwdlist[i+1:]
        thelen = passwdlen - 1
        pp = PasswordProduce(queue,first,lastlist,thelen)
        sp = Sqlpost(queue)
        passwdthread.append(pp)
        postthread.append(sp)
        pp.start()
        sp.start()

    for t in passwdthread:
        t.join()
    #设置,密码生产完毕
    global passwd_done
    passwd_done = True

    print "\npassword is produced done\n"

    for t in postthread:
        t.join()

    print "\nAll done!\n"

if __name__ == '__main__':
    main()
排列组合代码Google而来,等再写一篇文章进行分析,这篇写多线程编程。 1.join方法的使用。可以参考上面这篇文章,在密码生产线程都结束后,进行passwd_done变量的设置。 2.queue的使用。queue可以方便的进行线程间的通信数据交换。生产密码添加到queue,post线程提交从队列中获取密码。这里有两个判断条件:passwd_done判断密码生产是否结束,self.passwd.empty()检查队列是否为空。当这两个条件都符合时,密码就全部跑完了。还有一个全局变量tag用于当密码成功后,结束线程。 3.密码为了多线程生成,我取其中一个元素作为密码的首位,也就是有len(passwdlist)个线程。不知道这里用什么方法实现好。 4.数据的打印用了threading.Lock()

Continue

看了oschina上的两个代码,受益匪浅。其中对join()方法不理解,看python官网文档的介绍: join([timeout]):等待直到进程结束。这将阻塞正在调用的线程,直到调用join()方法的线程结束。(好难翻译,应该是这个意思) 哈哈,这个易懂。 join方法,如果一个线程或者一个函数在执行过程中要调用另外一个线程,并且待到其完成以后才能接着执行,那么在调用这个线程时可以使用被调用线程的join方法。

#-*- encoding: gb2312 -*-
import string, threading, time

def thread_main(a):
    global count, mutex
    # 获得线程名
    threadname = threading.currentThread().getName()

    for x in xrange(0, int(a)):
        # 取得锁
        mutex.acquire()
        count = count + 1
        # 释放锁
        mutex.release()
        print threadname, x, count
        time.sleep(1)

def main(num):
    global count, mutex
    threads = []

    count = 1
    # 创建一个锁
    mutex = threading.Lock()
    # 先创建线程对象
    for x in xrange(0, num):
        threads.append(threading.Thread(target=thread_main, args=(10,)))
    # 启动所有线程
    for t in threads:
        t.start()
    # 主线程中等待所有子线程退出
    for t in threads:
        t.join()  

if __name__ == '__main__':
    num = 4
    # 创建4个线程
    main(4)
###################################################################
#-*- encoding: gb2312 -*-
import threading
import time

class Test(threading.Thread):
    def __init__(self, num):
        threading.Thread.__init__(self)
        self._run_num = num

    def run(self):
        global count, mutex
        threadname = threading.currentThread().getName()

        for x in xrange(0, int(self._run_num)):
            mutex.acquire()
            count = count + 1
            mutex.release()
            print threadname, x, count
            time.sleep(1)

if __name__ == '__main__':
    global count, mutex
    threads = []
    num = 4
    count = 1
    # 创建锁
    mutex = threading.Lock()
    # 创建线程对象
    for x in xrange(0, num):
        threads.append(Test(10))
    # 启动线程
    for t in threads:
        t.start()
    # 等待子线程结束
    for t in threads:
        t.join()
在程序中,最后join()方法的调用就明白了,是主进程挨个调用子线程的join()方法。当四个线程都执行完毕后,主线程才会执行下面的代码,在这里也就是退出了。 相对应的在网上一起找到的另一个方法: 3.守护进程 setDaemon() 这个方法基本和join是相反的。当我们在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是,只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemon方法啦

Continue