实现备份文件功能
file_name = input("输入备份的文件: ") postion = file_name.rfind(".") file_name_new = file_name[:postion] + "[附件]" + file_name[postion:] f1 = open(file_name,'r',encoding="UTF-8") f2 = open(file_name_new,"w",encoding="UTF-8") while True: content = f1.read(1024) if len(content) == 0: break f2.write(content) f1.close()
f2.close()
文件定位 seek
f.seek(偏移量,0/1/2) 0 表示文件的开头 1 表示当前位置 2 表示文件的结尾
with open("deftest.py","r",encoding="UTF-8") as f:
f.seek(10,0) #从开头偏移了10个字节
print(f.read()) 读到最后
f.seek(0,0) #调到文件开头
f.tell() #查看当前位置
对文件的操作
import os os.rename('first.py','first.py.bak') #重命名 os.remove('abc.txt') #删除文件
os.mkdir('fengjian') #新建文件夹
os.rmdir('fengjian') #删除fengjian文件夹
os.chdir('fengjian') # cd到fengjian目录下
print(os.getcwd()) # 查看所在的路径
os.listdir("/root") #显示listdir目录下的所有文件,并且保存在列表中
修改目录下所有的文件的名字
#!/usr/bin/env python3 import os os.chdir("/root/fengjian/") #cd 到/root/fengjian/目录下 list_info = os.listdir("./") #把目录下所有文件变成列表 for text in list_info: #循环修改名字 os.rename(text,"[china]"+text)
迭代
def get_num(num): if num > 1: return num * get_num(num-1) else: return num reslut = get_num(4) print(reslut)
类定义:
class Feng(object): ''' 定义一个类''' #初始化对象 def __init__(self,name): self.name = name
def __str__(self):
return sele.name #方法 def drink(self): pass
#创建对象:
feng = Feng('feng')
#打印对象名后,得到 __str__ 返回值
print(feng)
烤白薯 class
class Potato(object):
def __init__(self,status,cooklevel):
self.status = status
self.cooklevel = cooklevel
self.condiments = []
def __str__(self):
return "时间为:%s 状态%s 添加的佐料有%s"%(self.cooklevel,self.status,str(self.condiments))
def cook(self,cook_time):
self.cooklevel += cook_time
if self.cooklevel >=0 and self.cooklevel < 3:
self.status = "生的"
elif self.cooklevel >=3 and self.cooklevel < 5:
self.status = "半生不熟"
elif self.cooklevel >= 5 and self.cooklevel < 8:
self.status = "熟了"
elif self.cooklevel >8:
self.status = "烤糊了"
def addcondiments(self, condiments):
self.condiments.append(condiments)
digua = Potato('生的',0)
for i in range(5):
digua.cook(1)
digua.addcondiments("白糖")
print(digua)
房子中存放家具
class House(object): def __init__(self,region,size,house_type): self.region = region self.size = size self.house_type = house_type self.list = [] self.Residual_area = size def __str__(self): msg = ''' 房屋的位置:%s 房屋的面积:%d 房屋的户型:%s 房屋剩余面积:%d 房屋的物品包括:%s '''%(self.region,self.size,self.house_type,self.Residual_area,str(self.list)) return msg def add_items(self,item): self.list.append(item.name) self.Residual_area -= item.size class Bed(object): def __init__(self,name,size): self.name = name self.size = size def __str__(self): return "床的名字%, 床的大小%d"%(self.name,self.size) home = House('北京',120,'三室一厅') print(home) bed1 = Bed('上下床',3) bed2 = Bed('土炕',6) home.add_items(bed1) print(home) home.add_items(bed2) print(home)
class 私有方法应用
class SendMes(object): #私有方法 def __send_msg(self): print("正在发送短信------") #共有方法 def send_mess(self,money): if money > 1: self.__send_msg() else: print("余额不足-----") send = SendMes() send.send_mess(1)
测量对象的引用个数:
import sys class Feng(object): pass t = Feng sys.getrefcount(t)
类调用类的方法, 子类调用父类的方法。
class person(object): def eat(self): print("-----吃-----") def drink(self): print("----喝------") def pay(self): print("----玩------") def happy(self): print("-----乐-----") def hobby(self): print("-----生活-----") class WhitePerson(person): def hobby(self): print('------睡觉------') #person.hobby(self) ####第一种方法:子类调用父类的方法,必须加上self #super().hobby() ####第二种方法: 调用父类的方法 super().hobby() super().hobby()
bairen = WhitePerson()
bairen.hobby()
类的多继承,通过 类.__mro__方法,查看从继承顺序。
class Base(object): def test(self): print("----这是父类的方法") class A(Base): def test(self): print("----这是子类A的方法") class B(Base): def test(self): print("----这是子类B的方法") class C(A,B): pass c = C() print(C.__mro__)
(<class '__main__.C'>, <class '__main__.A'>, <class '__main__.B'>, <class '__main__.Base'>, <class 'object'>)
类属性 和 实例属性,实例对象:
实例属性: 和具体的某个实例对象有关系,并且一个实例对象 和另一个实例对象是不共享属性的。
类属性: 类属性所属于类对象,并且多个实例对象之间共享一个类属性。
实例方法中调用类属性。
class Tool(object): #类属性 num = 0 #实例方法 def __init__(self,name): self.name = name Tool.num += 1 #实例方法中调用类属性 tool1 = Tool("剪刀") tool1 = Tool("水桶") print(Tool.num)
类方法,实例方法,静态方法
可以通过类的名字调用类的方法, 还可以通过类创建的对象,调用这个类方法
class Tool(object): #类属性 num = 0 #类方法 @classmethod def add_num(cls): cls.num += 100 #实例方法 def __init__(self,name): self.name = name #Tool.num += 1 #实例方法中调用类属性 #静态方法 @staticmethod def print_menu(): print("===============") print("==============") tool1 = Tool("剪刀") tool2 = Tool("水桶") #调用类方法 Tool.add_num() #打印类属性 print(Tool.num) #通过对象调用类方法 tool1.add_num() print(Tool.num) #通过类去调用静态方法 Tool.print_menu() #通过实例去调用静态方法 tool1.print_menu()
4s店买车实验
class Person(object): def goto_4s(self,momery): return Four_S_Store().look_car(momery) class Four_S_Store(object): def look_car(self,new_momery): if new_momery <= 100000: return Factory().car_order("捷达") elif new_momery > 100000: return Factory().car_order("迈腾") class Factory(object): def car_order(self,car_type): if car_type == "捷达": return JieDa() elif car_type == "迈腾": return Magotan() class Car(object): def start(self): print("======移动========") def stop(self): print("=====停止==========") class JieDa(Car): def __init__(self): print("捷达") class Magotan(Car): def __init__(self): print("迈腾") man = Person() car = man.goto_4s(100001) car.start()
使用__new__ 方法 创建单实例: 使用同一块内存空间。
class Animal(object): __instance = None __init_flag = False def __new__(cls, *args, **kwargs): if cls.__instance == None: cls.__instance = object.__new__(cls) #调用父类object 的__new__方法,并且保存在类属性__instance中 return Animal.__instance def __init__(self, name): if Animal.__init_flag == False: self.name = name Animal.__init_flag = True dog = Animal("狗") print(id(dog)) print(dog.name) cat = Animal("猫") print(id(cat)) print(cat.name)
异常 try except
第一:
try:
print(num)
except NameError:#异常的名字,如果不是这个异常,那么程序就退出,后续将不执行。
print("捕获到异常")
第二:
try: open('adsfadf.txt','r') except (NameError,FileNotFoundError): #可以把异常的异常的关键字用()保存成列表。 print("捕获到异常")
第三种 捕获所有Exception
try:
open('adfadf.txt')
except Exception: #如果用了 exception,那么意味着只要上面的except没有捕获到异常,这个execpt一定会捕获到, 使用Exception 都能捕获到异常。
print("所有异常都能捕获到")
第四种 as 错误放置到变量中
try:
open('adfadf.txt')
except Exception as feng: #把错误放到feng中, 通过print 打印出来。
print("所有异常都能捕获到")
print(feng)
第五种 else
try:
print("---------111111111111----------------")
except Exception as feng: #把错误放到feng中, 通过print 打印出来。
print("所有异常都能捕获到")
print(feng)
else:
print("没有异常才会执行的功能")
print("=++++++333========")
第六种 finally 不管有没有产生异常,最后都会执行finally
try:
print("---------111111111111----------------")
except Exception as feng: #把错误放到feng中, 通过print 打印出来。
print("所有异常都能捕获到")
print(feng)
else:
print("没有异常才会执行的功能")
finally:
print("---finally---")
print("=++++++333========")
模块:
import random
print(random.randrange(0,16)) # 获取 0 - 16 中间的数字
import os
print(os.__file__) # 查询模块的位置。
安装pygame模块
pip install pygame
从sendmsg.py 导入模块test模块
1 vim sendmsg.py
def test1():
print("sendmsg------test1方法")
def test2():
print("sendmsg------test2方法")
2. vim modulestest.py 导入sendmsg模块
from fengjian import sendmsg
sendmsg.test1()
sendmsg.test2()
from fengjian.sendmsg import test1,test2
test1()
test2()
from fengjian.sendmsg import *
test1()
test2()
__all__ 模块的作用: 为了别人把不需要的方法导入,在列表中放入了想让别人调用的函数名
定义模块,模块中包括 类,全局变量,函数,如果不使用__all__,所有的全部能引用. #__all__ = ["test1"] def test1(): print("sendmsg ---- test1方法") def test2(): print("sendmsg ---- test2方法") num = 200 class SuperMan(object): def __init__(self): print("初始化变量")
引用模块
from fengjian.sendmsg import *
print(test1())
print(test2())
print(num)
man = SuperMan()
#################################################
__all__ = ["test1"]
def test1():
print("sendmsg ---- test1方法")
def test2():
print("sendmsg ---- test2方法")
num = 200
class SuperMan(object):
def __init__(self):
print("初始化变量")
引用模块,只能引用test1,其他引用报错
from fengjian.sendmsg import *
print(test1())
print(test2())
print(num)
man = SuperMan()
模块sys.argv
vim test.py import sys print(sys.argv) python3 test.py abc --------------------- ['test.py','abc']
列表生成式
打印 10 至 70
第一种: i = 10 alist=[] while i <= 77: alist.append(i) 第二种 for i in range(10,77): print(i)
列表生成式 for循环只控制循环的次数
a = [i for i in range(10)]
print(a)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
b = [11 for i in range(1,10)]
[11,11,11,11,11,11,11,11.....]
a = [i for i in range(10) if i%2 ==0]
print(a)
[0, 2, 4, 6, 8]
集合:
a = [11,22,33,11,22,33] print(set(a)) {33, 11, 22}
a = [11,22,33,11,22,33]
print(list(set(a)))
[11,22,33]
浅拷贝: 内存的指向
a = [1,2,3,4,5] b = a id(a) 35411528 id(b) 35411528
深拷贝:
import copy a = [1,2,3,4,5] c = copy.deepcopy(a) print(id(a)) 43489480 print(id(c))
43489672
私有化
- xx: 公有变量
- _x: 单前置下划线,私有化属性或方法,from somemodule import *禁止导入,类对象和子类可以访问
- __xx:双前置下划线,避免与子类中的属性命名冲突,无法在外部直接访问(名字重整所以访问不到)
- __xx__:双前后下划线,用户名字空间的魔法对象或属性。例如:
__init__
, __ 不要自己发明这样的名字 - xx_:单后置下划线,用于避免与Python关键词的冲突
class Person(object): def __init__(self, name, age, taste): self.name = name self._age = age self.__taste = taste def showperson(self): print(self.name) print(self._age) print(self.__taste) def dowork(self): self._work() self.__away() def _work(self): print('my _work') def __away(self): print('my __away') class Student(Person): def construction(self, name, age, taste): self.name = name self._age = age self.__taste = taste def showstudent(self): print(self.name) print(self._age) print(self.__taste) @staticmethod def testbug(): _Bug.showbug() #模块内可以访问,当from cur_module import *时,不导入 class _Bug(object): @staticmethod def showbug(): print("showbug") s1 = Student('jack', 25, 'football') s1.showperson() print('*'*20) #无法访问__taste,导致报错 #s1.showstudent() s1.construction('rose', 30, 'basketball') s1.showperson() print('*'*20) s1.showstudent() print('*'*20) Student.testbug()
可迭代对象 : 1. 字符串,列表,文件,字典
2. generator 生成器,yield
for i in "abc": print(i) for temp in [11,22,33,44,55]: print(temp) with open("deftest.py",encoding="UTF-8") as f: for line in f: print(line) dictlist = { 'name':'feng', 'age': 25, 'job': 'IT' } for i in dictlist: print(i,dictlist[i])
a = (i for i in range(10))
for i in a:
print(i)
可以使用ininstance()判断一个对象是否是 iterable 对象
from collections import Iterable
print(isinstance([],Iterable))
print(isinstance("abc",Iterable))
print(isinstance({},Iterable))
print(isinstance((x for x in range(10)),Iterable))
迭代器
可是被next()函数调用并且不断返回下一个值的对象称为迭代器:Iterator 可是使用isinstance() 判断一个对象是否是Iterator对象: from collections import Iterator print(isinstance([],Iterator)) print(isinstance("abc",Iterator)) print(isinstance({},Iterator)) print(isinstance((x for x in range(10)),Iterator))
#TypeError: 'list' object is not an iterator
a = [11,22,33,44]
next(a)
iter 把可迭代对象 转成 迭代器
a = iter([11,22,33,44]) next(a) print(isinstance(a,Iterator))
闭包:
def wai(num1): def nei(num2): print(num1 + num2) return nei ret = wai(10) ####指向一个函数体#### ret(10)
装饰器
def Validate(func): def func_in(*args,**kwargs): return func(*args,**kwargs) return func_in @Validate def problems(a,b,c): return a+b+c print(problems(1,3,5))
带参数的装饰器
def Validate(arg): def func(funcname): def func_in(*args,**kwargs): return "名字是%s,结果是%d"%(arg,funcname(*args,**kwargs)) return func_in return func
#1. 先执行func_arg("haha")函数,这个函数return 的结果是func这个函数的引用
#2. @func
#3. 使用@func对problems进行装饰
@Validate("haha") def problems(a,b,c): return a+b+c print(problems(1,3,5))
动态添加属性以及方法
import types class Person(object): def __init__(self,name,age): self.name = name self.age = age def eat(self): print("-----%s正在吃东西-------"%self.name) def run(self): print("-----%s正在跑步-------" % self.name) p1 = Person("superman",10) print(p1.name) print(p1.age) p1.eat() #把run函数 添加到Person类中,变成方法 使用types模块的MethodType方法 p1.run = types.MethodType(run,p1) p1.run()
#动态添加属性
p1.addr = "bj"
print(p1.addr)
__slots__ 作用: 不允许添加额外的属性,只能预先定义的
class Person(): __slots__ = ("name","age") p1 = Person() p1.name = "superman" p1.age = 10
#添加 addr属性是报错 p1.addr = "beijing"
File "D:/python-test/new_python/slots.py", line 13, in <module>
p1.addr = "beijing"
AttributeError: 'Person' object has no attribute 'addr'
生成器:generator
把列表生成式[] 改成() b = (x for x in range(10)) <generator object <genexpr> at 0x0000000002203B48>
next(b)
使用yield 生成器
def fun_num(num): a,b = 0,1 i = 0 while i < num: yield b a,b = b,a+b i += 1 #创建一个生成器对象 a = fun_num(10) #让a这个生成器对象开始执行,如果是第一次执行,那么就从fun_num的开始部分执行, #如果 是之前已经执行过了, 那么就从上一次停止的位置开始执行 next(a) next(a)
也可以使用for循环取出
for i in a:
print(i)
内建属性
class Person(object): pass print(dir(Person)) ['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__',
'__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__']
内建函数:
range range( stop) ---> list of integers range(start,stop,step)
map 函数
会根据提供的函数对指定序列做映射
map(function,sequence[,sequence,.....])
function 是一个函数
sequence 是一个或者多个序列,
返回值是一个list
#函数需要一个参数
map(lambda x:x*x,[1,2,3])
#结果是 [1,4,9]
#函数需要两个参数
map(lambda x,y:x+y,[1,2,3],[1,2,3])
结果是 [1,4,6]
def f1(x,y):
return x,y
l1 = [1,2,3,4,5]
l2 = [6,7,8,9,10]
l3 = map(f1,l1,l2)
print(list(l3))
#[(1, 6), (2, 7), (3, 8), (4, 9), (5, 10)]
filter 函数
filter函数会对指定序列执行过滤操作
filter(function or None, sequence )
function:接受一个参数, 返回布尔值True 或者False
sequence: 序列可以是str,tuple , list
filter函数会对序列参数sequence中的每个元素调用function函数,最后的结果
包含调用结果为True的元素
ret = filter(lambda x:x%2,[0,1,2,3,4])
print(list(ret))
# 结果 [1,3]
在python3 中, reduce函数已经被从全局名字空间中移除,现在在functools模块里引入 from functools import reduce ret = reduce(lambda x,y:x+y,[1,2,3,4,5],10) #10先给x, y从列表中取出1, 然后x+y后,把值给x,y继续取列表的值。 print(ret)
sorted 函数,排序 sorted(iterable,reverse=False) sorted([11,5,3,2,12])
集合
#集合去重
a = [1,2,3,3,4,5,3,2,2,2] b = set(a) a = list(b) print(a)
交集
a = [1,2,3,3,4,5,3,2,2,2]
b = [5,6,7]
a = set(a)
b = set(b)
c = a & b
print(list(c))
并集
c = a | b
差集
c = a - b
对称差集
c = a^b
functools 工具函数包
import functools
dir(functools)
['WRAPPER_ASSIGNMENTS', 'WRAPPER_UPDATES', '__builtins__', '__doc__', '__file__', '__name__', '__package__', 'cmp_to_key', 'partial', 'reduce', 'total_ordering', 'update_wrapper', 'wraps']
hashlib 哈希加密 import hashlib m = hashlib.md5() #创建hash对象 m.update('password'.encode("UTF-8")) m.hexdigest()
hash = hashlib.sha512() hash.update('admin'.encode('utf-8')) print(hash.hexdigest())
进程池
from multiprocessing import Pool import time import os import random def worker(name): start_time = time.time() print('%s开始执行,进程号为%d'%(name,os.getpid())) time.sleep(5) stop_time = time.time() print("%s 执行完毕,耗时%0.2f"%(name,stop_time-start_time)) p = Pool(3) for i in range(10): p.apply_async(worker,args=(i,)) #每次循环将会用空闲出来的子进程调用目标 print('------start-------') p.close() #关闭进程池,关闭后p不在接受新的请求 p.join() #等待p中的所有子进程执行完成,必须放在close后 print('-----end------')
进程间通信Queue
from multiprocessing import Queue,Process import time import random def write(q): for var in ['a','b','c']: print("Put %s to queue"%var) q.put(var) time.sleep(random.randint(1,5)) def read(q): while True: if not q.empty(): value = q.get() print('get %s queue'%value) time.sleep(random.randint(1,5)) else: break if __name__ == '__main__': q = Queue(3) pw = Process(target=write,args=(q,)) pr = Process(target=read,args=(q,)) pw.start() pw.join() pr.start() pr.join() print("程序结束")
manager_queue
#!/usr/bin/env python # -*- coding:utf-8 -*- from multiprocessing import Manager,Pool import time import random import os def write(q): print('write进程启动(%d),父进程是%d'%(os.getpid(),os.getppid())) for var in 'fengjian': q.put(var) def read(q): while True: if not q.get(): value = q.get() print('read进程启动(%d),父进程是%d'%(os.getpid(),os.getppid())) else: break if __name__ == "__main__": po = Pool() q = Manager().Queue() po.apply_async(write,(q,)) po.apply_async(read,(q,)) po.close() po.join()
多进程拷贝文件:
from multiprocessing import Pool,Manager import os def CopyFileTask(name,OldFolderName,NewFolderName,queue): OldFolderNamefile = OldFolderName + "/" + name NewFolderNamefile = NewFolderName + "/" + name with open(OldFolderNamefile,'r',encoding="UTF-8") as fr: with open(NewFolderNamefile, 'w',encoding="UTF-8") as fw: for line in fr: fw.write(line) queue.put(name) def main(): #获取文件夹的名字 OldFolderName = input("请输入文件夹的名字: ") FullPath = os.getcwd() NewFolderName = OldFolderName + '_bak' #创建一个文件夹 #if NewFolderName: # os.rmdir(NewFolderName) os.mkdir(NewFolderName) #获取文件夹中所有的文件名字 filenames = os.listdir(OldFolderName) print(filenames) # 使用多进程的方式 copy 原文件夹的文件 pool = Pool(4) queue = Manager().Queue() for name in filenames: pool.apply_async(CopyFileTask,args=(name,OldFolderName,NewFolderName,queue)) num = 0 numall = len(filenames) while True: if not queue.empty(): queue.get() print('已完成%.2f%%拷贝'%(num/numall*100)) num += 1 else: break if __name__ == '__main__': main()
互斥锁
from threading import Thread,Lock import time g_num = 100 def work1(): global g_num #这个线程和work2线程都在抢这个锁,如果有一方上锁成功,那么另一个线程就会堵塞,知道解锁为止 mutex.acquire() for i in range(1000000): g_num += 1 mutex.release() print("---in work1,g_num is %d"%g_num) def work2(): global g_num mutex.acquire() for i in range(1000000): g_num += 1 mutex.release() print("---in work2,g_num is %d"%g_num) #创建互斥锁,这个锁默认没有上锁 mutex = Lock() t1 = Thread(target=work1) t1.start() #time.sleep(1) t2 = Thread(target=work2) t2.start()
多线程使用全局变量
from threading import Thread,Lock import time,threading def work1(): print("------thread name is %s-----"%threading.current_thread().name) #threading.current_thread().name 显示进程的名字 g_num = 100 g_num += 1 print("---in work1,g_num is %d"%g_num) t1 = Thread(target=work1) t1.start() #time.sleep(1) t2 = Thread(target=work1) t2.start()
结果
------thread name is Thread-1-----
---in work1,g_num is 101
------thread name is Thread-2-----
---in work1,g_num is 101
两个线程都在一个函数中执行,但是个是个的变量
from threading import Thread,Lock import time,threading def work1(): g_num = 100 name = threading.current_thread().name print("------thread name is %s-----"%name) if name == "Thread-1": g_num += 1 else: time.sleep(1) print("---进程的名字是%s,g_num is %d"%(name,g_num)) t1 = Thread(target=work1) t1.start() #time.sleep(1) t2 = Thread(target=work1) t2.start() 结果 ------thread name is Thread-1----- ---进程的名字是Thread-1,g_num is 101 ------thread name is Thread-2----- ---进程的名字是Thread-2,g_num is 100
生产者 消费者模型 使用 queue,和 都线程实现
from threading import Lock,Thread from queue import Queue import time class Producer(Thread): def run(self): global q while True: if q.empty: for add in range(100): name = str(add) + "包子" q.put(name) print("生产了"+name) time.sleep(1) class Consumer(Thread): def run(self): global q while True: if q.qsize != 0: name = self.name + "消费了" + q.get() print(name) else: print("没有包子了") if __name__ == '__main__': q = Queue() for i in range(2): p = Producer() p.start() for i in range(5): c = Consumer() c.start()
异步
from multiprocessing import Pool import os import time def work1(): print("-----进程池中的进程----pid=%d, ppid=%d"%(os.getpid(),os.getppid())) for i in range(3): print("-----%d------"%i) time.sleep(1) return "hello world" def work2(name): print("callback 回调的进程pid=%d"%os.getppid()) print("---callbak name is %s"%name) p = Pool() p.apply_async(work1,callback=work2) time.sleep(5) print("---主进程----pid=%d"%os.getppid())
网络 socket
服务器端接受信息: from socket import * udpSocket = socket(AF_INET,SOCK_DGRAM) bindAddr = ("",7788) udpSocket.bind(bindAddr) recvData = udpSocket.recvfrom(1024)
content,destInfo = recvData #内容,("172.16.230.168",52252) print("centent is %s"%content.decode("utf-8")) ############################
客户端发送信息: from socket import * udpSocket = socket(AF_INET,SOCK_DGRAM) for i in range(10): udpSocket.sendto(str(i).encode("utf-8"),("172.16.230.169",7788)) # encode("utf-8") 字符编码不写,报错。
################################
from socket import *
udpSocket = socket(AF_INET,SOCK_DGRAM)
destIp = str(input("请输入目的IP地址: "))
destPort = input("请输入目的端口: ")
destData = str(input("请输入发送的内容: "))
udpSocket.sendto(destData.encode("utf-8"),(destIp,destPort))
简单的聊天室
服务器端 : from socket import * import time def main(): udpSocket = socket(AF_INET,SOCK_DGRAM) bindInfo = ("",55555) udpSocket.bind(bindInfo) while True: content,destInfo = udpSocket.recvfrom(1024) # time.sleep(0.5) print("[%s] %s %s "%(time.ctime(),destInfo[0],content.decode("gb2312"))) #print(content) udpSocket.close() if __name__ == '__main__': main() ################################## 客户端 from socket import * from multiprocessing import Pool import time def taskwork(name): centent = "ubuntu 测试环境" + str(name) udpSocket.sendto(centent.encode("gb2312"),indIP) udpSocket = socket(AF_INET, SOCK_DGRAM) indIP = ("172.16.200.85", 55555) def main(): pool = Pool(8) for i in range(1000): pool.apply_async(taskwork,(i,)) pool.close() pool.join() if __name__ == '__main__': main()