乐闻世界logo
搜索文章和话题

面试题手册

Python 中的上下文管理器(Context Manager)是什么?如何使用?

Python 上下文管理器详解上下文管理器的基本概念上下文管理器是 Python 中用于管理资源的对象,它定义了进入和退出上下文时应该执行的操作。最常见的使用方式是 with 语句。为什么需要上下文管理器# 不好的做法 - 手动管理资源file = open('example.txt', 'r')try: content = file.read() process(content)finally: file.close()# 好的做法 - 使用上下文管理器with open('example.txt', 'r') as file: content = file.read() process(content)# 文件自动关闭上下文管理器协议上下文管理器需要实现两个方法:__enter__(self): 进入上下文时调用__exit__(self, exc_type, exc_val, exc_tb): 退出上下文时调用基本实现class MyContextManager: def __init__(self, resource): self.resource = resource def __enter__(self): print("进入上下文") return self.resource def __exit__(self, exc_type, exc_val, exc_tb): print("退出上下文") # 返回 True 表示抑制异常,False 表示传播异常 return False# 使用上下文管理器with MyContextManager("资源") as resource: print(f"使用资源: {resource}")# 输出:# 进入上下文# 使用资源: 资源# 退出上下文异常处理class SafeContextManager: def __enter__(self): print("进入安全上下文") return self def __exit__(self, exc_type, exc_val, exc_tb): print("退出安全上下文") if exc_type is not None: print(f"捕获到异常: {exc_type.__name__}: {exc_val}") # 返回 True 抑制异常 return True return False# 测试异常处理with SafeContextManager(): print("执行操作") raise ValueError("测试异常")print("程序继续执行")# 输出:# 进入安全上下文# 执行操作# 退出安全上下文# 捕获到异常: ValueError: 测试异常# 程序继续执行contextlib 模块@contextmanager 装饰器@contextmanager 装饰器简化了上下文管理器的创建。from contextlib import contextmanager@contextmanagerdef simple_context(): print("进入上下文") try: yield "资源" finally: print("退出上下文")# 使用with simple_context() as resource: print(f"使用资源: {resource}")带异常处理的上下文管理器from contextlib import contextmanager@contextmanagerdef error_handling_context(): print("进入错误处理上下文") try: yield except ValueError as e: print(f"处理 ValueError: {e}") raise # 重新抛出异常 finally: print("清理资源")# 使用try: with error_handling_context(): print("执行操作") raise ValueError("测试异常")except ValueError: print("捕获到重新抛出的异常")closing 函数closing 函数为没有上下文管理器协议的对象创建上下文管理器。from contextlib import closingclass Resource: def __init__(self, name): self.name = name def close(self): print(f"关闭资源: {self.name}")# 使用 closingwith closing(Resource("数据库连接")) as resource: print(f"使用资源: {resource.name}")# 资源自动关闭suppress 函数suppress 函数用于忽略指定的异常。from contextlib import suppress# 忽略 FileNotFoundErrorwith suppress(FileNotFoundError): with open('nonexistent.txt', 'r') as f: content = f.read()print("程序继续执行")# 忽略多个异常with suppress(FileNotFoundError, PermissionError): with open('protected.txt', 'r') as f: content = f.read()redirectstdout 和 redirectstderrfrom contextlib import redirect_stdout, redirect_stderrimport io# 重定向标准输出output = io.StringIO()with redirect_stdout(output): print("这条消息被重定向")print(f"捕获的输出: {output.getvalue()}")# 重定向标准错误error_output = io.StringIO()with redirect_stderr(error_output): print("错误消息", file=sys.stderr)print(f"捕获的错误: {error_output.getvalue()}")实际应用场景1. 文件操作# 自动关闭文件with open('input.txt', 'r') as input_file: with open('output.txt', 'w') as output_file: for line in input_file: output_file.write(line.upper())# 两个文件都会自动关闭2. 数据库连接import sqlite3from contextlib import contextmanager@contextmanagerdef database_connection(db_path): conn = sqlite3.connect(db_path) try: yield conn finally: conn.close()# 使用with database_connection('example.db') as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users") results = cursor.fetchall()# 连接自动关闭3. 锁管理import threadingfrom contextlib import contextmanagerclass LockManager: def __init__(self): self.lock = threading.Lock() @contextmanager def acquire(self): self.lock.acquire() try: yield finally: self.lock.release()# 使用lock_manager = LockManager()with lock_manager.acquire(): # 临界区代码 print("执行临界区操作")# 锁自动释放4. 临时目录import tempfileimport os# 创建临时目录with tempfile.TemporaryDirectory() as temp_dir: print(f"临时目录: {temp_dir}") temp_file = os.path.join(temp_dir, 'temp.txt') with open(temp_file, 'w') as f: f.write("临时数据") # 临时目录及其内容在退出时自动删除# 临时目录已被删除5. 计时器import timefrom contextlib import contextmanager@contextmanagerdef timer(name): start_time = time.time() yield end_time = time.time() print(f"{name} 耗时: {end_time - start_time:.4f} 秒")# 使用with timer("数据处理"): data = [i ** 2 for i in range(1000000)] sum(data)# 输出: 数据处理 耗时: 0.1234 秒6. 临时改变配置from contextlib import contextmanagerclass Config: def __init__(self): self.debug = False self.verbose = Falseconfig = Config()@contextmanagerdef temporary_config(config_obj, **kwargs): """临时修改配置""" original_values = {} # 保存原始值 for key, value in kwargs.items(): original_values[key] = getattr(config_obj, key) setattr(config_obj, key, value) try: yield finally: # 恢复原始值 for key, value in original_values.items(): setattr(config_obj, key, value)# 使用print(f"调试模式: {config.debug}") # Falsewith temporary_config(config, debug=True, verbose=True): print(f"调试模式: {config.debug}") # True print(f"详细模式: {config.verbose}") # Trueprint(f"调试模式: {config.debug}") # False7. 事务管理from contextlib import contextmanagerclass Database: def __init__(self): self.in_transaction = False self.data = {} def begin_transaction(self): self.in_transaction = True self.transaction_data = self.data.copy() def commit(self): self.in_transaction = False print("事务提交") def rollback(self): self.in_transaction = False self.data = self.transaction_data print("事务回滚")@contextmanagerdef transaction(db): db.begin_transaction() try: yield db db.commit() except Exception as e: db.rollback() raise# 使用db = Database()try: with transaction(db): db.data['key1'] = 'value1' db.data['key2'] = 'value2' # raise Exception("测试异常") # 会触发回滚except Exception as e: print(f"事务失败: {e}")print(db.data)嵌套上下文管理器使用多个 with 语句# 嵌套使用with open('file1.txt', 'r') as f1: with open('file2.txt', 'r') as f2: content1 = f1.read() content2 = f2.read() # 处理两个文件使用 contextlib.ExitStackExitStack 允许动态管理多个上下文管理器。from contextlib import ExitStackfiles = ['file1.txt', 'file2.txt', 'file3.txt']with ExitStack() as stack: file_handles = [stack.enter_context(open(f, 'r')) for f in files] contents = [f.read() for f in file_handles] # 所有文件在退出时自动关闭条件性上下文管理器from contextlib import ExitStack, nullcontextdef get_context(use_context): if use_context: return MyContextManager("资源") else: return nullcontext()# 条件性使用上下文管理器with get_context(True) as resource: if resource is not None: print(f"使用资源: {resource}") else: print("不使用上下文管理器")异步上下文管理器异步上下文管理器协议异步上下文管理器实现:__aenter__(self): 异步进入上下文__aexit__(self, exc_type, exc_val, exc_tb): 异步退出上下文class AsyncContextManager: def __init__(self, resource): self.resource = resource async def __aenter__(self): print("异步进入上下文") await self.connect() return self.resource async def __aexit__(self, exc_type, exc_val, exc_tb): print("异步退出上下文") await self.disconnect() async def connect(self): print("连接资源...") async def disconnect(self): print("断开连接...")# 使用异步上下文管理器async def use_async_context(): async with AsyncContextManager("异步资源") as resource: print(f"使用资源: {resource}")# 运行import asyncioasyncio.run(use_async_context())asynccontextmanager 装饰器from contextlib import asynccontextmanager@asynccontextmanagerasync def async_resource(): print("获取异步资源") try: yield "异步资源" finally: print("释放异步资源")async def use_async_resource(): async with async_resource() as resource: print(f"使用资源: {resource}")asyncio.run(use_async_resource())最佳实践1. 确保资源清理# 好的做法 - 使用 finally 确保清理class ResourceManager: def __enter__(self): self.resource = acquire_resource() return self.resource def __exit__(self, exc_type, exc_val, exc_tb): release_resource(self.resource) return False2. 正确处理异常# 好的做法 - 区分异常类型class SafeContextManager: def __exit__(self, exc_type, exc_val, exc_tb): if exc_type is None: print("正常退出") elif issubclass(exc_type, (ValueError, TypeError)): print(f"处理预期异常: {exc_val}") return True # 抑制预期异常 else: print(f"未处理异常: {exc_val}") return False # 传播未处理异常3. 提供有用的错误信息class DatabaseContextManager: def __enter__(self): try: self.connection = connect_to_database() return self.connection except ConnectionError as e: raise RuntimeError(f"无法连接到数据库: {e}") def __exit__(self, exc_type, exc_val, exc_tb): if self.connection: try: self.connection.close() except Exception as e: print(f"关闭连接时出错: {e}")4. 支持上下文管理器链class ChainedContextManager: def __init__(self, *managers): self.managers = managers def __enter__(self): self.entered_managers = [] try: for manager in self.managers: self.entered_managers.append(manager.__enter__()) return self.entered_managers[-1] except: # 如果失败,清理已进入的上下文 self.__exit__(None, None, None) raise def __exit__(self, exc_type, exc_val, exc_tb): # 反向退出上下文 for manager in reversed(self.entered_managers): manager.__exit__(exc_type, exc_val, exc_tb) return False总结Python 上下文管理器的核心概念:基本协议:实现 __enter__ 和 __exit__ 方法with 语句:自动管理资源的进入和退出异常处理:在 __exit__ 中处理异常contextlib 模块:简化上下文管理器的创建实际应用:文件操作、数据库连接、锁管理等嵌套管理:使用多个 with 语句或 ExitStack异步支持:异步上下文管理器用于异步代码上下文管理器的优势:自动资源管理,避免资源泄漏代码更简洁、更易读异常安全,确保资源清理支持嵌套和链式使用掌握上下文管理器,能够编写出更安全、更优雅的 Python 代码。
阅读 0·2月21日 17:10

Python 中的 GIL(全局解释器锁)是什么?如何避免 GIL 的影响?

Python GIL(全局解释器锁)详解什么是 GILGIL(Global Interpreter Lock,全局解释器锁)是 Python 解释器(主要是 CPython)中的一个互斥锁,它确保在任何时候只有一个线程在执行 Python 字节码。这意味着即使在多核 CPU 上,Python 的多线程程序也无法真正实现并行执行。GIL 存在的原因1. 内存管理安全Python 使用引用计数(Reference Counting)来管理内存,每个对象都有一个引用计数器。当引用计数降为 0 时,对象会被自动回收。如果没有 GIL,多个线程同时修改引用计数会导致竞态条件(Race Condition)。2. C 扩展兼容性许多 Python 的 C 扩展库(如 NumPy、Pandas)都不是线程安全的,GIL 保护了这些扩展库的安全性。3. 实现简单性GIL 是一个相对简单的解决方案,避免了复杂的细粒度锁机制。GIL 的工作机制import threadingimport timedef count_down(n): while n > 0: n -= 1# 单线程执行start = time.time()count_down(100000000)print(f"单线程耗时: {time.time() - start:.4f} 秒")# 多线程执行start = time.time()t1 = threading.Thread(target=count_down, args=(50000000,))t2 = threading.Thread(target=count_down, args=(50000000,))t1.start()t2.start()t1.join()t2.join()print(f"多线程耗时: {time.time() - start:.4f} 秒")在 CPU 密集型任务中,多线程可能比单线程更慢,因为存在 GIL 和线程切换的开销。GIL 的影响场景1. CPU 密集型任务(受 GIL 影响大)import threadingimport timedef cpu_bound_task(n): result = 0 for i in range(n): result += i ** 2 return result# 单线程start = time.time()result1 = cpu_bound_task(1000000)result2 = cpu_bound_task(1000000)print(f"单线程结果: {result1 + result2}, 耗时: {time.time() - start:.4f} 秒")# 多线程start = time.time()t1 = threading.Thread(target=lambda: cpu_bound_task(1000000))t2 = threading.Thread(target=lambda: cpu_bound_task(1000000))t1.start()t2.start()t1.join()t2.join()print(f"多线程耗时: {time.time() - start:.4f} 秒")2. I/O 密集型任务(受 GIL 影响小)import threadingimport timeimport requestsdef download_url(url): response = requests.get(url) return len(response.content)urls = [ "https://www.example.com", "https://www.google.com", "https://www.github.com",]# 单线程start = time.time()for url in urls: download_url(url)print(f"单线程耗时: {time.time() - start:.4f} 秒")# 多线程start = time.time()threads = [threading.Thread(target=download_url, args=(url,)) for url in urls]for t in threads: t.start()for t in threads: t.join()print(f"多线程耗时: {time.time() - start:.4f} 秒")在 I/O 密集型任务中,多线程可以显著提高性能,因为线程在等待 I/O 时会释放 GIL。绕过 GIL 的方法1. 使用多进程(multiprocessing)import multiprocessingimport timedef cpu_bound_task(n): result = 0 for i in range(n): result += i ** 2 return resultif __name__ == '__main__': # 单进程 start = time.time() result1 = cpu_bound_task(1000000) result2 = cpu_bound_task(1000000) print(f"单进程耗时: {time.time() - start:.4f} 秒") # 多进程 start = time.time() pool = multiprocessing.Pool(processes=2) results = pool.map(cpu_bound_task, [1000000, 1000000]) pool.close() pool.join() print(f"多进程耗时: {time.time() - start:.4f} 秒")多进程每个进程有独立的 Python 解释器和 GIL,可以真正实现并行计算。2. 使用异步编程(asyncio)import asyncioimport aiohttpimport timeasync def fetch_url(session, url): async with session.get(url) as response: return await response.text()async def main(urls): async with aiohttp.ClientSession() as session: tasks = [fetch_url(session, url) for url in urls] return await asyncio.gather(*tasks)urls = [ "https://www.example.com", "https://www.google.com", "https://www.github.com",]start = time.time()asyncio.run(main(urls))print(f"异步耗时: {time.time() - start:.4f} 秒")3. 使用 C 扩展或 Cython# 使用 Cython 编写的模块# mymodule.pyxdef fast_function(int n): cdef int i cdef int result = 0 for i in range(n): result += i * i return resultCython 代码可以释放 GIL,实现真正的并行计算。4. 使用 NumPy 等优化库import numpy as npimport time# NumPy 内部操作会释放 GILarr1 = np.random.rand(1000000)arr2 = np.random.rand(1000000)start = time.time()result = np.dot(arr1, arr2)print(f"NumPy 耗时: {time.time() - start:.4f} 秒")GIL 的释放时机Python 解释器在以下情况会释放 GIL:I/O 操作:文件读写、网络请求等时间片到期:默认每 1000 个字节码指令检查一次显式释放:某些 C 扩展可以手动释放 GIL长时间操作:某些长时间运行的操作会释放 GILimport threadingimport timedef test_gil_release(): print(f"线程 {threading.current_thread().name} 开始") time.sleep(1) # I/O 操作,释放 GIL print(f"线程 {threading.current_thread().name} 结束")t1 = threading.Thread(target=test_gil_release, name="Thread-1")t2 = threading.Thread(target=test_gil_release, name="Thread-2")t1.start()t2.start()t1.join()t2.join()不同 Python 实现的 GILCPython:有 GILJython:无 GIL(基于 JVM)IronPython:无 GIL(基于 .NET)PyPy:有 GIL,但性能更好Stackless Python:有 GIL,但支持微线程性能优化建议1. 选择合适的并发模型# CPU 密集型:使用多进程from multiprocessing import Pooldef process_data(data): return sum(x * x for x in data)with Pool(4) as pool: results = pool.map(process_data, data_chunks)2. I/O 密集型:使用多线程或异步# 多线程import threadingdef io_task(url): # I/O 操作 passthreads = [threading.Thread(target=io_task, args=(url,)) for url in urls]for t in threads: t.start()for t in threads: t.join()# 或使用异步import asyncioasync def async_io_task(url): # 异步 I/O 操作 passasync def main(): await asyncio.gather(*[async_io_task(url) for url in urls])asyncio.run(main())3. 混合使用from multiprocessing import Poolimport threadingdef worker(data_chunk): # 每个进程内部可以使用线程处理 I/O results = [] threads = [] for item in data_chunk: t = threading.Thread(target=process_item, args=(item, results)) threads.append(t) t.start() for t in threads: t.join() return resultswith Pool(4) as pool: results = pool.map(worker, data_chunks)总结GIL 的优点简化内存管理,避免复杂的锁机制保护 C 扩展的线程安全单线程性能优秀GIL 的缺点限制了多线程在 CPU 密集型任务中的性能无法充分利用多核 CPU在某些场景下性能不如其他语言最佳实践I/O 密集型:使用多线程或异步编程CPU 密集型:使用多进程或考虑其他语言混合型:结合多进程和多线程性能关键:使用 Cython、NumPy 等优化工具理解 GIL 的工作原理和影响,有助于选择合适的并发策略,编写高效的 Python 程序。
阅读 0·2月21日 17:10

TCP Nagle 算法的原理和作用是什么?

TCP Nagle 算法详解Nagle 算法是一种用于减少网络中小数据包数量的算法,通过合并多个小数据包来提高传输效率。Nagle 算法原理算法规则数据包小于 MSS:如果待发送的数据包小于 MSS(最大报文段大小)等待 ACK:等待前一个数据包的 ACK 到达合并发送:将多个小数据包合并成一个大数据包发送超时机制:如果 ACK 超时未到达,立即发送当前数据包工作流程发送方 → [数据包1 < MSS] → 等待ACK → [数据包2 < MSS] → 合并发送 → 接收方Nagle 算法的优势1. 减少网络负载减少数据包数量:合并多个小数据包,减少网络中的数据包数量降低带宽消耗:减少 TCP 首部开销(每个数据包 20-60 字节)提高网络效率:减少路由器和交换机的处理负担2. 提高吞吐量减少 ACK 数量:合并发送减少 ACK 数量降低延迟:减少往返次数提高带宽利用率:更充分地利用网络带宽Nagle 算法的问题1. 增加延迟等待 ACK:需要等待前一个数据包的 ACK 才能发送下一个数据包实时性差:对实时性要求高的应用影响较大延迟累积:多个小数据包的延迟会累积2. 与延迟 ACK 冲突延迟 ACK:接收方延迟发送 ACK,减少 ACK 数量双重延迟:Nagle 算法等待 ACK + 延迟 ACK = 更大的延迟影响性能:可能导致 40ms、200ms 甚至更大的延迟禁用 Nagle 算法适用场景实时性要求高:在线游戏、实时音视频等小数据包频繁:需要立即发送小数据包低延迟优先:延迟比吞吐量更重要配置方法Linux 系统配置# 禁用 Nagle 算法(TCP_NODELAY)sysctl -w net.ipv4.tcp_low_latency=1编程配置(Python)import socketsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)编程配置(C)int flag = 1;setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));Nagle 算法 vs 延迟 ACKNagle 算法位置:发送方目的:减少小数据包数量机制:等待 ACK 到达后再发送下一个数据包延迟 ACK位置:接收方目的:减少 ACK 数量机制:延迟发送 ACK,等待多个数据包到达后一起确认冲突解决禁用 Nagle 算法:发送方立即发送小数据包禁用延迟 ACK:接收方立即发送 ACK权衡:根据应用场景选择合适的策略相关问题为什么 Nagle 算法会增加延迟?什么时候应该禁用 Nagle 算法?Nagle 算法和延迟 ACK 如何配合使用?
阅读 0·2月21日 17:10

TCP Keep-Alive 机制的作用和原理是什么?

TCP Keep-Alive 机制详解TCP Keep-Alive 是一种检测连接是否存活的机制,用于及时发现和清理失效的连接。Keep-Alive 机制原理工作流程空闲等待:连接在空闲一段时间后(默认 2 小时),开始发送 Keep-Alive 探测包发送探测:发送一个不包含数据的 TCP 报文段,序列号为当前序列号减 1等待响应:收到 ACK:连接正常,重置计时器收到 RST:连接被对方重置,关闭连接超时未响应:连接可能失效,继续探测探测参数tcpkeepalivetime:连接空闲多长时间后开始发送探测(默认 7200 秒)tcpkeepaliveintvl:探测包发送间隔(默认 75 秒)tcpkeepaliveprobes:最大探测次数(默认 9 次)Keep-Alive 的作用1. 检测失效连接网络中断、设备故障等导致连接失效及时发现并清理失效连接,释放资源2. 防止连接假死应用层未正常关闭连接,但连接已失效Keep-Alive 可以检测到这种情况3. 保持连接活跃某些中间设备(如 NAT)会清理长时间空闲的连接Keep-Alive 可以保持连接不被清理Keep-Alive 的缺点1. 资源消耗需要维护连接状态和计时器大量连接时,Keep-Alive 会增加系统负担2. 误判风险网络延迟可能导致误判连接失效可能误关闭正常但延迟高的连接3. 不及时默认等待时间较长(2 小时)无法快速检测到连接失效应用层心跳 vs Keep-AliveKeep-Alive优点:操作系统级别支持,无需应用层实现缺点:参数固定,不够灵活,等待时间长应用层心跳优点:灵活可控,可以携带业务数据,检测更及时缺点:需要应用层实现,增加开发成本配置示例Linux 系统配置# 查看 Keep-Alive 参数sysctl net.ipv4.tcp_keepalive_timesysctl net.ipv4.tcp_keepalive_intvlsysctl net.ipv4.tcp_keepalive_probes# 修改 Keep-Alive 参数sysctl -w net.ipv4.tcp_keepalive_time=600sysctl -w net.ipv4.tcp_keepalive_intvl=30sysctl -w net.ipv4.tcp_keepalive_probes=3编程配置(Python)import socketsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 600)sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 30)sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 3)相关问题Keep-Alive 和心跳包有什么区别?如何优化 Keep-Alive 参数?为什么默认 Keep-Alive 时间这么长?
阅读 0·2月21日 17:10

Python 面向对象编程的核心概念有哪些?

Python 面向对象编程详解类与对象的基本概念类(Class)类是创建对象的模板,定义了对象的属性和方法。类是抽象的,不占用内存。对象(Object)对象是类的实例,占用内存,具有具体的属性值。class Person: def __init__(self, name, age): self.name = name # 实例属性 self.age = age def say_hello(self): # 实例方法 print(f"Hello, I'm {self.name}, {self.age} years old")# 创建对象person1 = Person("Alice", 25)person2 = Person("Bob", 30)person1.say_hello() # Hello, I'm Alice, 25 years oldperson2.say_hello() # Hello, I'm Bob, 30 years old类属性与实例属性类属性类属性属于类本身,所有实例共享同一个类属性。class Dog: species = "Canis familiaris" # 类属性 count = 0 # 类属性 def __init__(self, name): self.name = name # 实例属性 Dog.count += 1dog1 = Dog("Buddy")dog2 = Dog("Max")print(dog1.species) # Canis familiarisprint(dog2.species) # Canis familiarisprint(Dog.count) # 2# 修改类属性Dog.species = "Canis lupus"print(dog1.species) # Canis lupusprint(dog2.species) # Canis lupus实例属性实例属性属于单个对象实例,每个实例有自己独立的副本。class Car: def __init__(self, brand, color): self.brand = brand # 实例属性 self.color = color # 实例属性car1 = Car("Toyota", "Red")car2 = Car("Honda", "Blue")print(car1.brand) # Toyotaprint(car2.brand) # Honda# 修改实例属性car1.color = "Green"print(car1.color) # Greenprint(car2.color) # Blue方法类型实例方法实例方法是最常用的方法,第一个参数是 self,表示对象本身。class Circle: def __init__(self, radius): self.radius = radius def area(self): # 实例方法 return 3.14159 * self.radius ** 2 def circumference(self): # 实例方法 return 2 * 3.14159 * self.radiuscircle = Circle(5)print(circle.area()) # 78.53975print(circle.circumference()) # 31.4159类方法类方法使用 @classmethod 装饰器,第一个参数是 cls,表示类本身。class Person: count = 0 def __init__(self, name): self.name = name Person.count += 1 @classmethod def get_count(cls): # 类方法 return cls.count @classmethod def create_from_string(cls, name_str): # 类方法作为工厂方法 name = name_str.strip().title() return cls(name)person1 = Person("Alice")person2 = Person.create_from_string(" bob ")print(Person.get_count()) # 2print(person2.name) # Bob静态方法静态方法使用 @staticmethod 装饰器,不需要 self 或 cls 参数。class MathUtils: @staticmethod def add(a, b): # 静态方法 return a + b @staticmethod def multiply(a, b): # 静态方法 return a * b# 直接通过类调用print(MathUtils.add(3, 5)) # 8print(MathUtils.multiply(4, 6)) # 24# 也可以通过对象调用math_utils = MathUtils()print(math_utils.add(3, 5)) # 8继承单继承class Animal: def __init__(self, name): self.name = name def speak(self): passclass Dog(Animal): # 继承 Animal 类 def speak(self): return f"{self.name} says Woof!"class Cat(Animal): # 继承 Animal 类 def speak(self): return f"{self.name} says Meow!"dog = Dog("Buddy")cat = Cat("Whiskers")print(dog.speak()) # Buddy says Woof!print(cat.speak()) # Whiskers says Meow!多继承class Flyable: def fly(self): return "Flying"class Swimmable: def swim(self): return "Swimming"class Duck(Flyable, Swimmable): # 多继承 def __init__(self, name): self.name = name def quack(self): return f"{self.name} says Quack!"duck = Duck("Donald")print(duck.fly()) # Flyingprint(duck.swim()) # Swimmingprint(duck.quack()) # Donald says Quack!方法解析顺序(MRO)class A: def method(self): print("A")class B(A): def method(self): print("B")class C(A): def method(self): print("C")class D(B, C): # 多继承 passd = D()d.method() # B# 查看 MROprint(D.__mro__)# (<class '__main__.D'>, <class '__main__.B'>, <class '__main__.C'>, <class '__main__.A'>, <class 'object'>)多态多态允许不同类的对象对同一消息做出不同的响应。class Shape: def area(self): passclass Rectangle(Shape): def __init__(self, width, height): self.width = width self.height = height def area(self): return self.width * self.heightclass Circle(Shape): def __init__(self, radius): self.radius = radius def area(self): return 3.14159 * self.radius ** 2def calculate_area(shape): return shape.area()# 多态:不同对象调用相同方法rectangle = Rectangle(5, 3)circle = Circle(4)print(calculate_area(rectangle)) # 15print(calculate_area(circle)) # 50.26544封装封装是将数据和操作数据的方法绑定在一起,并隐藏内部实现细节。私有属性和方法class BankAccount: def __init__(self, account_number, balance): self.account_number = account_number self.__balance = balance # 私有属性(双下划线) def deposit(self, amount): if amount > 0: self.__balance += amount return True return False def withdraw(self, amount): if 0 < amount <= self.__balance: self.__balance -= amount return True return False def get_balance(self): # 访问私有属性 return self.__balance def __validate_amount(self, amount): # 私有方法 return amount > 0account = BankAccount("123456", 1000)account.deposit(500)account.withdraw(200)print(account.get_balance()) # 1300# 无法直接访问私有属性# print(account.__balance) # AttributeError属性装饰器class Temperature: def __init__(self, celsius): self.__celsius = celsius @property def celsius(self): # getter return self.__celsius @celsius.setter def celsius(self, value): # setter if value < -273.15: raise ValueError("Temperature below absolute zero") self.__celsius = value @property def fahrenheit(self): # 只读属性 return self.__celsius * 9/5 + 32temp = Temperature(25)print(temp.celsius) # 25print(temp.fahrenheit) # 77.0temp.celsius = 30print(temp.celsius) # 30# temp.celsius = -300 # ValueError抽象类与接口抽象基类(ABC)from abc import ABC, abstractmethodclass Animal(ABC): # 抽象基类 @abstractmethod def speak(self): # 抽象方法 pass @abstractmethod def move(self): # 抽象方法 passclass Dog(Animal): def speak(self): return "Woof!" def move(self): return "Running"class Cat(Animal): def speak(self): return "Meow!" def move(self): return "Prowling"# 无法直接实例化抽象类# animal = Animal() # TypeErrordog = Dog()cat = Cat()print(dog.speak()) # Woof!print(cat.move()) # Prowling特殊方法(魔术方法)常用特殊方法class Vector: def __init__(self, x, y): self.x = x self.y = y def __str__(self): # 字符串表示 return f"Vector({self.x}, {self.y})" def __repr__(self): # 开发者表示 return f"Vector(x={self.x}, y={self.y})" def __add__(self, other): # 加法运算 return Vector(self.x + other.x, self.y + other.y) def __eq__(self, other): # 相等比较 return self.x == other.x and self.y == other.y def __len__(self): # 长度 return int((self.x ** 2 + self.y ** 2) ** 0.5)v1 = Vector(3, 4)v2 = Vector(1, 2)print(v1) # Vector(3, 4)print(repr(v1)) # Vector(x=3, y=4)print(v1 + v2) # Vector(4, 6)print(v1 == Vector(3, 4)) # Trueprint(len(v1)) # 5上下文管理器class FileManager: def __init__(self, filename, mode): self.filename = filename self.mode = mode self.file = None def __enter__(self): # 进入上下文 self.file = open(self.filename, self.mode) return self.file def __exit__(self, exc_type, exc_val, exc_tb): # 退出上下文 if self.file: self.file.close() return False# 使用上下文管理器with FileManager("example.txt", "w") as f: f.write("Hello, World!")# 文件自动关闭设计模式单例模式class Singleton: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) return cls._instances1 = Singleton()s2 = Singleton()print(s1 is s2) # True工厂模式from abc import ABC, abstractmethodclass Animal(ABC): @abstractmethod def speak(self): passclass Dog(Animal): def speak(self): return "Woof!"class Cat(Animal): def speak(self): return "Meow!"class AnimalFactory: @staticmethod def create_animal(animal_type): if animal_type == "dog": return Dog() elif animal_type == "cat": return Cat() else: raise ValueError(f"Unknown animal type: {animal_type}")dog = AnimalFactory.create_animal("dog")cat = AnimalFactory.create_animal("cat")print(dog.speak()) # Woof!print(cat.speak()) # Meow!最佳实践1. 使用组合优于继承# 不好的做法 - 过度使用继承class FlyingDog(Dog, Flyable): pass# 好的做法 - 使用组合class Robot: def __init__(self, flying_ability): self.flying_ability = flying_ability def fly(self): return self.flying_ability.fly()class FlyingAbility: def fly(self): return "Flying"robot = Robot(FlyingAbility())print(robot.fly()) # Flying2. 遵循 SOLID 原则# 单一职责原则class User: def __init__(self, name, email): self.name = name self.email = emailclass UserRepository: def save(self, user): passclass EmailService: def send_email(self, user, message): pass3. 使用类型提示from typing import List, Optionalclass Product: def __init__(self, name: str, price: float): self.name = name self.price = price def get_discounted_price(self, discount: float) -> float: return self.price * (1 - discount)class ShoppingCart: def __init__(self): self.items: List[Product] = [] def add_item(self, product: Product) -> None: self.items.append(product) def get_total(self) -> float: return sum(item.price for item in self.items)总结Python 面向对象编程的核心概念:类与对象:类是模板,对象是实例属性与方法:类属性共享,实例属性独立;实例方法、类方法、静态方法继承:单继承、多继承、方法解析顺序(MRO)多态:不同对象对同一消息的不同响应封装:隐藏内部实现,提供公共接口抽象类:定义接口规范,强制子类实现特殊方法:实现运算符重载、上下文管理等设计模式:单例、工厂等常用模式掌握面向对象编程,能够编写出结构清晰、易于维护和扩展的代码。
阅读 0·2月21日 17:10

Python 中的列表推导式和生成器表达式有什么区别?

Python 生成器表达式与列表推导式详解列表推导式基本语法列表推导式是一种简洁的创建列表的方式,它将循环和条件判断结合在一起。# 基本列表推导式numbers = [1, 2, 3, 4, 5]# 传统方式squares = []for num in numbers: squares.append(num ** 2)# 列表推导式squares = [num ** 2 for num in numbers]print(squares) # [1, 4, 9, 16, 25]带条件的列表推导式# 带过滤条件的列表推导式numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]# 获取偶数evens = [num for num in numbers if num % 2 == 0]print(evens) # [2, 4, 6, 8, 10]# 获取大于5的奇数odd_gt_5 = [num for num in numbers if num % 2 == 1 and num > 5]print(odd_gt_5) # [7, 9]# 使用 if-else 表达式result = ["偶数" if num % 2 == 0 else "奇数" for num in numbers[:5]]print(result) # ['奇数', '偶数', '奇数', '偶数', '奇数']嵌套列表推导式# 嵌套列表推导式matrix = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]# 展平二维列表flattened = [item for row in matrix for item in row]print(flattened) # [1, 2, 3, 4, 5, 6, 7, 8, 9]# 转置矩阵transposed = [[row[i] for row in matrix] for i in range(3)]print(transposed) # [[1, 4, 7], [2, 5, 8], [3, 6, 9]]# 创建乘法表multiplication_table = [[i * j for j in range(1, 6)] for i in range(1, 6)]for row in multiplication_table: print(row)# [1, 2, 3, 4, 5]# [2, 4, 6, 8, 10]# [3, 6, 9, 12, 15]# [4, 8, 12, 16, 20]# [5, 10, 15, 20, 25]列表推导式的实际应用# 1. 数据转换names = ["alice", "bob", "charlie"]capitalized = [name.capitalize() for name in names]print(capitalized) # ['Alice', 'Bob', 'Charlie']# 2. 数据过滤data = [1, -2, 3, -4, 5, -6, 7, -8, 9, -10]positive = [x for x in data if x > 0]print(positive) # [1, 3, 5, 7, 9]# 3. 字典键值转换user_dict = {"name": "Alice", "age": 25, "city": "New York"}keys = [key for key in user_dict.keys()]values = [value for value in user_dict.values()]items = [f"{key}: {value}" for key, value in user_dict.items()]print(keys) # ['name', 'age', 'city']print(values) # ['Alice', 25, 'New York']print(items) # ['name: Alice', 'age: 25', 'city: New York']# 4. 文件处理# 假设有一个文件包含多行文本lines = ["hello world", "python is great", "list comprehension"]words = [word for line in lines for word in line.split()]print(words) # ['hello', 'world', 'python', 'is', 'great', 'list', 'comprehension']生成器表达式基本语法生成器表达式与列表推导式语法相似,但使用圆括号而不是方括号。生成器表达式返回一个生成器对象,而不是列表。# 生成器表达式numbers = [1, 2, 3, 4, 5]# 列表推导式squares_list = [num ** 2 for num in numbers]print(squares_list) # [1, 4, 9, 16, 25]print(type(squares_list)) # <class 'list'># 生成器表达式squares_gen = (num ** 2 for num in numbers)print(squares_gen) # <generator object <genexpr> at 0x...>print(type(squares_gen)) # <class 'generator'># 使用生成器print(list(squares_gen)) # [1, 4, 9, 16, 25]生成器的惰性求值# 生成器的惰性求值特性def count(): print("生成器开始执行") for i in range(5): print(f"生成 {i}") yield i# 创建生成器gen = count()print("生成器已创建")# 逐个获取值print(f"获取值: {next(gen)}")print(f"获取值: {next(gen)}")print(f"获取值: {next(gen)}")# 输出:# 生成器已创建# 生成器开始执行# 生成 0# 获取值: 0# 生成 1# 获取值: 1# 生成 2# 获取值: 2生成器表达式的实际应用# 1. 处理大文件# 假设有一个大文件,逐行读取def read_large_file(filename): with open(filename, 'r') as f: for line in f: yield line.strip()# 使用生成器表达式处理# lines = (line for line in read_large_file('large_file.txt'))# long_lines = [line for line in lines if len(line) > 100]# 2. 无限序列import itertools# 无限的偶数生成器evens = (i for i in itertools.count(0, 2))print(next(evens)) # 0print(next(evens)) # 2print(next(evens)) # 4# 3. 链式处理numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]# 链式生成器表达式result = ( num ** 2 for num in numbers if num % 2 == 0 if num > 4)print(list(result)) # [36, 64, 100]# 4. 内存高效的数据处理# 处理大量数据时,使用生成器可以节省内存large_data = range(1000000)# 列表推导式 - 占用大量内存# squares_list = [x ** 2 for x in large_data]# 生成器表达式 - 内存高效squares_gen = (x ** 2 for x in large_data)# 只在需要时计算print(next(squares_gen)) # 0print(next(squares_gen)) # 1列表推导式 vs 生成器表达式内存使用对比import sys# 列表推导式 - 立即创建所有元素list_comp = [x ** 2 for x in range(1000000)]print(f"列表推导式内存使用: {sys.getsizeof(list_comp)} bytes")# 生成器表达式 - 惰性求值,不立即创建所有元素gen_expr = (x ** 2 for x in range(1000000))print(f"生成器表达式内存使用: {sys.getsizeof(gen_expr)} bytes")# 输出:# 列表推导式内存使用: 8000056 bytes (约 8MB)# 生成器表达式内存使用: 200 bytes (非常小)性能对比import time# 测试列表推导式性能start = time.time()list_comp = [x ** 2 for x in range(1000000)]list_time = time.time() - start# 测试生成器表达式性能start = time.time()gen_expr = (x ** 2 for x in range(1000000))gen_time = time.time() - startprint(f"列表推导式创建时间: {list_time:.4f} 秒")print(f"生成器表达式创建时间: {gen_time:.4f} 秒")# 但如果需要遍历所有元素start = time.time()for _ in list_comp: passlist_iterate_time = time.time() - startstart = time.time()for _ in gen_expr: passgen_iterate_time = time.time() - startprint(f"列表推导式遍历时间: {list_iterate_time:.4f} 秒")print(f"生成器表达式遍历时间: {gen_iterate_time:.4f} 秒")使用场景对比# 适合使用列表推导式的场景# 1. 需要多次访问结果numbers = [1, 2, 3, 4, 5]squares = [x ** 2 for x in numbers]print(squares[0]) # 1print(squares[2]) # 9print(squares[4]) # 25# 2. 需要索引访问for i, value in enumerate(squares): print(f"索引 {i}: {value}")# 3. 需要切片操作print(squares[1:4]) # [4, 9, 16]# 适合使用生成器表达式的场景# 1. 处理大数据集large_numbers = range(10000000)squares_gen = (x ** 2 for x in large_numbers)# 2. 只需要遍历一次total = sum(x ** 2 for x in range(1000000))print(f"总和: {total}")# 3. 链式操作result = ( x ** 2 for x in range(100) if x % 2 == 0)result = (x + 1 for x in result)result = (x * 2 for x in result)print(list(result)[:5]) # [2, 18, 50, 98, 162]高级应用1. 使用生成器表达式实现管道# 数据处理管道def pipeline(data, *functions): """创建数据处理管道""" result = data for func in functions: result = func(result) return result# 定义处理函数def filter_even(numbers): return (num for num in numbers if num % 2 == 0)def square(numbers): return (num ** 2 for num in numbers)def add_one(numbers): return (num + 1 for num in numbers)# 使用管道numbers = range(10)result = pipeline(numbers, filter_even, square, add_one)print(list(result)) # [1, 5, 17, 37, 65]2. 使用生成器表达式处理文件# 假设有一个日志文件# log.txt:# 2024-01-01 10:00:00 INFO User logged in# 2024-01-01 10:01:00 ERROR Connection failed# 2024-01-01 10:02:00 INFO User logged out# 2024-01-01 10:03:00 ERROR Timeout occurred# 使用生成器表达式处理日志文件def process_log_file(filename): """处理日志文件,提取错误信息""" with open(filename, 'r') as f: # 生成器表达式:只提取错误行 errors = ( line.strip() for line in f if 'ERROR' in line ) # 进一步处理 error_messages = ( line.split('ERROR ')[1] for line in errors ) return list(error_messages)# errors = process_log_file('log.txt')# print(errors) # ['Connection failed', 'Timeout occurred']3. 使用列表推导式创建复杂结构# 创建字典keys = ['name', 'age', 'city']values = ['Alice', 25, 'New York']person_dict = {keys[i]: values[i] for i in range(len(keys))}print(person_dict) # {'name': 'Alice', 'age': 25, 'city': 'New York'}# 创建嵌套字典users = [ {'name': 'Alice', 'age': 25}, {'name': 'Bob', 'age': 30}, {'name': 'Charlie', 'age': 35}]user_index = {user['name']: user['age'] for user in users}print(user_index) # {'Alice': 25, 'Bob': 30, 'Charlie': 35}# 创建集合numbers = [1, 2, 2, 3, 3, 3, 4, 4, 4, 4]unique_numbers = {num for num in numbers}print(unique_numbers) # {1, 2, 3, 4}# 创建元组coordinates = [(x, y) for x in range(3) for y in range(3)]print(coordinates)# [(0, 0), (0, 1), (0, 2), (1, 0), (1, 1), (1, 2), (2, 0), (2, 1), (2, 2)]4. 使用生成器表达式实现无限序列# 斐波那契数列生成器def fibonacci(): a, b = 0, 1 while True: yield a a, b = b, a + b# 获取前10个斐波那契数fib = fibonacci()first_10 = [next(fib) for _ in range(10)]print(first_10) # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]# 质数生成器def primes(): """生成质数""" num = 2 while True: if all(num % i != 0 for i in range(2, int(num ** 0.5) + 1)): yield num num += 1# 获取前10个质数prime_gen = primes()first_10_primes = [next(prime_gen) for _ in range(10)]print(first_10_primes) # [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]最佳实践1. 可读性优先# 好的做法 - 清晰易读numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]evens = [num for num in numbers if num % 2 == 0]# 不好的做法 - 过于复杂result = [x for x in [y ** 2 for y in range(10)] if x > 50 and x < 100]# 更好的做法 - 分步骤squares = [y ** 2 for y in range(10)]result = [x for x in squares if 50 < x < 100]2. 避免副作用# 不好的做法 - 在推导式中有副作用items = []result = [items.append(x) for x in range(10)] # 错误!# 好的做法 - 使用循环items = []for x in range(10): items.append(x)# 或者直接使用列表推导式items = [x for x in range(10)]3. 选择合适的数据结构# 需要多次访问 - 使用列表numbers = [x ** 2 for x in range(100)]print(numbers[0])print(numbers[50])print(numbers[99])# 只需要遍历一次 - 使用生成器total = sum(x ** 2 for x in range(1000000))# 需要唯一值 - 使用集合unique = {x % 10 for x in range(100)}# 需要键值对 - 使用字典mapping = {x: x ** 2 for x in range(10)}4. 考虑性能# 对于大数据集,使用生成器表达式large_data = range(10000000)# 好的做法 - 使用生成器result = sum(x ** 2 for x in large_data)# 不好的做法 - 使用列表(占用大量内存)# result = sum([x ** 2 for x in large_data])# 对于小数据集,列表推导式可能更快small_data = range(100)result = sum([x ** 2 for x in small_data])总结Python 列表推导式和生成器表达式的核心概念:列表推导式基本语法:[expression for item in iterable if condition]特点:立即创建列表,支持索引和切片适用场景:需要多次访问、需要索引操作、数据量较小生成器表达式基本语法:(expression for item in iterable if condition)特点:惰性求值,内存高效,只能遍历一次适用场景:处理大数据集、只需要遍历一次、链式操作主要区别内存使用:生成器表达式更节省内存访问方式:列表支持索引,生成器不支持重用性:列表可以多次访问,生成器只能遍历一次创建时间:生成器创建更快,但遍历时间可能更长最佳实践优先考虑可读性避免在推导式中使用副作用根据需求选择合适的数据结构考虑性能和内存使用掌握列表推导式和生成器表达式,能够编写出更简洁、更高效的 Python 代码。它们是 Python 中非常强大的特性,能够显著提高代码的可读性和性能。
阅读 0·2月21日 17:10

Python 中的描述符是什么?如何使用?

Python 描述符详解描述符的基本概念描述符是 Python 中实现属性访问控制的强大机制。描述符协议包含三个方法:__get__、__set__ 和 __delete__。任何实现了这些方法的对象都可以作为描述符使用。描述符协议class Descriptor: def __get__(self, obj, objtype=None): """获取属性值""" pass def __set__(self, obj, value): """设置属性值""" pass def __delete__(self, obj): """删除属性""" pass数据描述符 vs 非数据描述符数据描述符实现了 __get__ 和 __set__ 方法的描述符称为数据描述符。class DataDescriptor: def __init__(self, initial_value=None): self.value = initial_value def __get__(self, obj, objtype=None): print(f"获取数据描述符值: {self.value}") return self.value def __set__(self, obj, value): print(f"设置数据描述符值: {value}") self.value = valueclass MyClass: attr = DataDescriptor(42)obj = MyClass()print(obj.attr) # 获取数据描述符值: 42obj.attr = 100 # 设置数据描述符值: 100print(obj.attr) # 获取数据描述符值: 100非数据描述符只实现了 __get__ 方法的描述符称为非数据描述符。class NonDataDescriptor: def __init__(self, initial_value=None): self.value = initial_value def __get__(self, obj, objtype=None): print(f"获取非数据描述符值: {self.value}") return self.valueclass MyClass: attr = NonDataDescriptor(42)obj = MyClass()print(obj.attr) # 获取非数据描述符值: 42obj.attr = 100 # 设置实例属性,不调用 __set__print(obj.attr) # 100(实例属性优先)数据描述符 vs 非数据描述符的优先级class DataDesc: def __get__(self, obj, objtype=None): return "数据描述符" def __set__(self, obj, value): passclass NonDataDesc: def __get__(self, obj, objtype=None): return "非数据描述符"class MyClass: data_desc = DataDesc() non_data_desc = NonDataDesc()obj = MyClass()obj.data_desc = "实例值"obj.non_data_desc = "实例值"print(obj.data_desc) # 数据描述符(数据描述符优先)print(obj.non_data_desc) # 实例值(实例属性优先)描述符的实际应用1. 类型检查class Typed: """类型检查描述符""" def __init__(self, name, expected_type): self.name = name self.expected_type = expected_type def __get__(self, obj, objtype=None): if obj is None: return self return obj.__dict__.get(self.name) def __set__(self, obj, value): if not isinstance(value, self.expected_type): raise TypeError( f"属性 {self.name} 应该是 {self.expected_type} 类型," f"但得到的是 {type(value)}" ) obj.__dict__[self.name] = valueclass Person: name = Typed('name', str) age = Typed('age', int)person = Person()person.name = "Alice" # 正常person.age = 25 # 正常# person.age = "25" # TypeError: 属性 age 应该是 <class 'int'> 类型,但得到的是 <class 'str'>2. 值验证class Validated: """值验证描述符""" def __init__(self, name, validator): self.name = name self.validator = validator def __get__(self, obj, objtype=None): if obj is None: return self return obj.__dict__.get(self.name) def __set__(self, obj, value): if not self.validator(value): raise ValueError(f"属性 {self.name} 的值 {value} 无效") obj.__dict__[self.name] = valueclass Person: age = Validated('age', lambda x: isinstance(x, int) and 0 <= x <= 150) name = Validated('name', lambda x: isinstance(x, str) and len(x) > 0)person = Person()person.age = 25 # 正常person.name = "Alice" # 正常# person.age = -5 # ValueError: 属性 age 的值 -5 无效# person.name = "" # ValueError: 属性 name 的值 无效3. 延迟计算class LazyProperty: """延迟计算属性描述符""" def __init__(self, func): self.func = func self.name = func.__name__ def __get__(self, obj, objtype=None): if obj is None: return self # 检查是否已经计算过 if self.name not in obj.__dict__: print(f"延迟计算 {self.name}") obj.__dict__[self.name] = self.func(obj) return obj.__dict__[self.name]class Circle: def __init__(self, radius): self.radius = radius @LazyProperty def area(self): print("计算面积...") return 3.14159 * self.radius ** 2 @LazyProperty def circumference(self): print("计算周长...") return 2 * 3.14159 * self.radiuscircle = Circle(5)print(circle.area) # 延迟计算 area, 计算面积..., 78.53975print(circle.area) # 78.53975(直接返回缓存值)print(circle.circumference) # 延迟计算 circumference, 计算周长..., 31.41594. 只读属性class ReadOnly: """只读属性描述符""" def __init__(self, name, value): self.name = name self.value = value def __get__(self, obj, objtype=None): if obj is None: return self return self.value def __set__(self, obj, value): raise AttributeError(f"属性 {self.name} 是只读的")class Config: VERSION = ReadOnly('VERSION', '1.0.0') AUTHOR = ReadOnly('AUTHOR', 'Alice')config = Config()print(config.VERSION) # 1.0.0# config.VERSION = '2.0.0' # AttributeError: 属性 VERSION 是只读的5. 属性访问日志class Logged: """属性访问日志描述符""" def __init__(self, name): self.name = name def __get__(self, obj, objtype=None): if obj is None: return self value = obj.__dict__.get(self.name) print(f"读取属性 {self.name}: {value}") return value def __set__(self, obj, value): print(f"设置属性 {self.name}: {value}") obj.__dict__[self.name] = value def __delete__(self, obj): print(f"删除属性 {self.name}") del obj.__dict__[self.name]class Person: name = Logged('name') age = Logged('age')person = Person()person.name = "Alice" # 设置属性 name: Aliceperson.age = 25 # 设置属性 age: 25print(person.name) # 读取属性 name: Alicedel person.age # 删除属性 age6. 缓存属性class Cached: """缓存属性描述符""" def __init__(self, func): self.func = func self.name = func.__name__ self.cache = {} def __get__(self, obj, objtype=None): if obj is None: return self # 使用对象 ID 作为缓存键 obj_id = id(obj) if obj_id not in self.cache: print(f"计算并缓存 {self.name}") self.cache[obj_id] = self.func(obj) else: print(f"使用缓存 {self.name}") return self.cache[obj_id]class ExpensiveCalculator: def __init__(self, base): self.base = base @Cached def expensive_operation(self): print("执行耗时操作...") import time time.sleep(1) return self.base ** 2calc = ExpensiveCalculator(5)print(calc.expensive_operation) # 计算并缓存 expensive_operation, 执行耗时操作..., 25print(calc.expensive_operation) # 使用缓存 expensive_operation, 25描述符与 property 的关系property 本质上是描述符# property 实际上是一个描述符类class MyClass: @property def my_property(self): return "property 值" @my_property.setter def my_property(self, value): print(f"设置 property: {value}")obj = MyClass()print(obj.my_property) # property 值obj.my_property = "新值" # 设置 property: 新值自定义 property 类class MyProperty: """自定义 property 类""" def __init__(self, fget=None, fset=None, fdel=None, doc=None): self.fget = fget self.fset = fset self.fdel = fdel self.__doc__ = doc def __get__(self, obj, objtype=None): if obj is None: return self if self.fget is None: raise AttributeError("不可读") return self.fget(obj) def __set__(self, obj, value): if self.fset is None: raise AttributeError("不可写") self.fset(obj, value) def __delete__(self, obj): if self.fdel is None: raise AttributeError("不可删除") self.fdel(obj) def getter(self, fget): self.fget = fget return self def setter(self, fset): self.fset = fset return self def deleter(self, fdel): self.fdel = fdel return selfclass Person: def __init__(self): self._name = "" @MyProperty def name(self): return self._name @name.setter def name(self, value): self._name = valueperson = Person()person.name = "Alice"print(person.name) # Alice描述符的高级用法描述符与类方法class ClassMethodDescriptor: """类方法描述符""" def __init__(self, func): self.func = func def __get__(self, obj, objtype=None): if objtype is None: objtype = type(obj) return self.func.__get__(objtype, objtype)class MyClass: @ClassMethodDescriptor def class_method(cls): return f"类方法: {cls.__name__}"print(MyClass.class_method()) # 类方法: MyClass描述符与静态方法class StaticMethodDescriptor: """静态方法描述符""" def __init__(self, func): self.func = func def __get__(self, obj, objtype=None): return self.funcclass MyClass: @StaticMethodDescriptor def static_method(): return "静态方法"print(MyClass.static_method()) # 静态方法描述符链class ValidatedTyped: """验证和类型检查组合描述符""" def __init__(self, name, expected_type, validator=None): self.name = name self.expected_type = expected_type self.validator = validator def __get__(self, obj, objtype=None): if obj is None: return self return obj.__dict__.get(self.name) def __set__(self, obj, value): # 类型检查 if not isinstance(value, self.expected_type): raise TypeError( f"属性 {self.name} 应该是 {self.expected_type} 类型" ) # 值验证 if self.validator and not self.validator(value): raise ValueError(f"属性 {self.name} 的值 {value} 无效") obj.__dict__[self.name] = valueclass Person: age = ValidatedTyped( 'age', int, lambda x: 0 <= x <= 150 ) name = ValidatedTyped( 'name', str, lambda x: len(x) > 0 )person = Person()person.name = "Alice"person.age = 25# person.age = "25" # TypeError# person.age = -5 # ValueError描述符的最佳实践1. 使用 set_name 方法(Python 3.6+)class Descriptor: """使用 __set_name__ 的描述符""" def __set_name__(self, owner, name): self.name = name self.private_name = f'_{name}' def __get__(self, obj, objtype=None): if obj is None: return self return getattr(obj, self.private_name) def __set__(self, obj, value): setattr(obj, self.private_name, value)class MyClass: attr = Descriptor()obj = MyClass()obj.attr = 42print(obj.attr) # 422. 避免描述符中的循环引用import weakrefclass WeakRefDescriptor: """使用弱引用的描述符""" def __init__(self): self.instances = weakref.WeakKeyDictionary() def __get__(self, obj, objtype=None): if obj is None: return self return self.instances.get(obj) def __set__(self, obj, value): self.instances[obj] = valueclass MyClass: attr = WeakRefDescriptor()obj = MyClass()obj.attr = 42print(obj.attr) # 423. 提供清晰的错误信息class ValidatedDescriptor: """提供清晰错误信息的描述符""" def __init__(self, name, expected_type): self.name = name self.expected_type = expected_type def __get__(self, obj, objtype=None): if obj is None: return self return obj.__dict__.get(self.name) def __set__(self, obj, value): if not isinstance(value, self.expected_type): raise TypeError( f"在类 {obj.__class__.__name__} 中," f"属性 '{self.name}' 应该是 {self.expected_type.__name__} 类型," f"但得到的是 {type(value).__name__}" ) obj.__dict__[self.name] = valueclass Person: age = ValidatedDescriptor('age', int)person = Person()# person.age = "25" # TypeError: 在类 Person 中,属性 'age' 应该是 int 类型,但得到的是 str描述符的实际应用案例1. ORM 模型字段class Field: """ORM 字段描述符""" def __init__(self, field_type, primary_key=False): self.field_type = field_type self.primary_key = primary_key self.name = None def __set_name__(self, owner, name): self.name = name def __get__(self, obj, objtype=None): if obj is None: return self return obj.__dict__.get(f'_{self.name}') def __set__(self, obj, value): if not isinstance(value, self.field_type): raise TypeError(f"字段 {self.name} 类型错误") obj.__dict__[f'_{self.name}'] = valueclass ModelMeta(type): """模型元类""" def __new__(cls, name, bases, attrs): fields = {} for key, value in list(attrs.items()): if isinstance(value, Field): fields[key] = value attrs['_fields'] = fields return super().__new__(cls, name, bases, attrs)class User(metaclass=ModelMeta): id = Field(int, primary_key=True) name = Field(str) age = Field(int)user = User()user.name = "Alice"user.age = 25print(user.name) # Alice2. 单位转换class Temperature: """温度单位转换描述符""" def __init__(self, name): self.name = name def __get__(self, obj, objtype=None): if obj is None: return self return obj.__dict__.get(f'_{self.name}') def __set__(self, obj, value): if isinstance(value, (int, float)): obj.__dict__[f'_{self.name}'] = value elif isinstance(value, str): if value.endswith('°C'): obj.__dict__[f'_{self.name}'] = float(value[:-2]) elif value.endswith('°F'): obj.__dict__[f'_{self.name}'] = (float(value[:-2]) - 32) * 5/9 else: raise ValueError("无效的温度格式") else: raise TypeError("无效的温度类型")class Weather: celsius = Temperature('celsius')weather = Weather()weather.celsius = "25°C"print(weather.celsius) # 25.0weather.celsius = "77°F"print(weather.celsius) # 25.0总结Python 描述符的核心概念:描述符协议:__get__、__set__、__delete__ 三个方法数据描述符:实现了 __get__ 和 __set__,优先级高于实例属性非数据描述符:只实现了 __get__,优先级低于实例属性实际应用:类型检查值验证延迟计算只读属性属性访问日志缓存属性描述符的优势:强大的属性访问控制可重用的属性逻辑清晰的代码组织与 Python 内置机制集成描述符的最佳实践:使用 __set_name__ 方法(Python 3.6+)避免循环引用,使用弱引用提供清晰的错误信息理解描述符的优先级规则考虑使用更简单的替代方案(property)描述符是 Python 中实现高级属性控制的核心机制,理解描述符对于深入掌握 Python 的面向对象编程非常重要。property、classmethod、staticmethod 等都是基于描述符实现的。
阅读 0·2月21日 17:10

Python 中多线程和多进程有什么区别?分别在什么场景下使用?

Python 多线程与多进程详解线程与进程的基本概念进程(Process)进程是操作系统分配资源的基本单位,拥有独立的内存空间、文件句柄等系统资源。每个进程都有自己独立的地址空间,进程间通信需要特殊的机制(IPC)。线程(Thread)线程是 CPU 调度的基本单位,同一进程内的线程共享进程的内存空间和资源。线程间的通信相对简单,但需要处理同步问题。Python 中的多线程threading 模块Python 的 threading 模块提供了线程相关的操作。import threadingimport timedef worker(name, delay): print(f"线程 {name} 开始") time.sleep(delay) print(f"线程 {name} 结束")# 创建线程t1 = threading.Thread(target=worker, args=("Thread-1", 2))t2 = threading.Thread(target=worker, args=("Thread-2", 1))# 启动线程t1.start()t2.start()# 等待线程结束t1.join()t2.join()print("主线程结束")线程同步由于多个线程共享内存,需要使用同步机制来避免竞态条件。1. Lock(锁)import threadingcounter = 0lock = threading.Lock()def increment(): global counter for _ in range(100000): with lock: # 自动获取和释放锁 counter += 1threads = []for _ in range(10): t = threading.Thread(target=increment) threads.append(t) t.start()for t in threads: t.join()print(f"计数器值: {counter}") # 应该是 10000002. RLock(可重入锁)import threadinglock = threading.RLock()def recursive_function(n): with lock: if n > 0: print(f"递归深度: {n}") recursive_function(n - 1)recursive_function(5)3. Semaphore(信号量)import threadingimport timesemaphore = threading.Semaphore(3) # 最多允许 3 个线程同时执行def worker(worker_id): with semaphore: print(f"Worker {worker_id} 开始工作") time.sleep(2) print(f"Worker {worker_id} 完成工作")threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]for t in threads: t.start()for t in threads: t.join()4. Event(事件)import threadingimport timeevent = threading.Event()def waiter(): print("等待事件...") event.wait() # 等待事件被设置 print("事件已触发,继续执行")def setter(): time.sleep(2) print("设置事件") event.set() # 触发事件t1 = threading.Thread(target=waiter)t2 = threading.Thread(target=setter)t1.start()t2.start()t1.join()t2.join()5. Condition(条件变量)import threadingimport timeimport randomcondition = threading.Condition()queue = []def producer(): for i in range(5): time.sleep(random.random()) with condition: item = f"Item-{i}" queue.append(item) print(f"生产: {item}") condition.notify() # 通知等待的消费者def consumer(): for _ in range(5): with condition: while not queue: print("等待产品...") condition.wait() # 等待产品 item = queue.pop(0) print(f"消费: {item}")t1 = threading.Thread(target=producer)t2 = threading.Thread(target=consumer)t1.start()t2.start()t1.join()t2.join()Python 中的多进程multiprocessing 模块Python 的 multiprocessing 模块提供了多进程支持,每个进程有独立的 Python 解释器和 GIL。import multiprocessingimport timedef worker(name, delay): print(f"进程 {name} 开始") time.sleep(delay) print(f"进程 {name} 结束")if __name__ == '__main__': # 创建进程 p1 = multiprocessing.Process(target=worker, args=("Process-1", 2)) p2 = multiprocessing.Process(target=worker, args=("Process-2", 1)) # 启动进程 p1.start() p2.start() # 等待进程结束 p1.join() p2.join() print("主进程结束")进程池(Process Pool)import multiprocessingdef square(x): return x * xif __name__ == '__main__': # 创建进程池 with multiprocessing.Pool(processes=4) as pool: # 并行计算 numbers = list(range(10)) results = pool.map(square, numbers) print(f"结果: {results}") # 异步计算 async_results = [pool.apply_async(square, (x,)) for x in numbers] results = [r.get() for r in async_results] print(f"异步结果: {results}")进程间通信(IPC)1. Queue(队列)import multiprocessingdef producer(queue): for i in range(5): queue.put(f"Message-{i}") queue.put("STOP")def consumer(queue): while True: message = queue.get() if message == "STOP": break print(f"收到: {message}")if __name__ == '__main__': queue = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(queue,)) p2 = multiprocessing.Process(target=consumer, args=(queue,)) p1.start() p2.start() p1.join() p2.join()2. Pipe(管道)import multiprocessingdef sender(conn): conn.send("Hello from sender") conn.close()def receiver(conn): message = conn.recv() print(f"收到消息: {message}")if __name__ == '__main__': parent_conn, child_conn = multiprocessing.Pipe() p1 = multiprocessing.Process(target=sender, args=(child_conn,)) p2 = multiprocessing.Process(target=receiver, args=(parent_conn,)) p1.start() p2.start() p1.join() p2.join()3. Value 和 Array(共享内存)import multiprocessingdef increment(counter): with counter.get_lock(): counter.value += 1if __name__ == '__main__': counter = multiprocessing.Value('i', 0) processes = [multiprocessing.Process(target=increment, args=(counter,)) for _ in range(100)] for p in processes: p.start() for p in processes: p.join() print(f"计数器值: {counter.value}")多线程 vs 多进程性能对比import threadingimport multiprocessingimport timedef cpu_bound_task(n): result = 0 for i in range(n): result += i ** 2 return resultdef test_multithreading(): start = time.time() threads = [threading.Thread(target=cpu_bound_task, args=(1000000,)) for _ in range(4)] for t in threads: t.start() for t in threads: t.join() print(f"多线程耗时: {time.time() - start:.4f} 秒")def test_multiprocessing(): start = time.time() with multiprocessing.Pool(processes=4) as pool: results = pool.map(cpu_bound_task, [1000000] * 4) print(f"多进程耗时: {time.time() - start:.4f} 秒")if __name__ == '__main__': test_multithreading() test_multiprocessing()对比总结| 特性 | 多线程 | 多进程 ||------|--------|--------|| 内存使用 | 共享内存,占用少 | 独立内存,占用多 || 通信方式 | 共享变量、队列等 | Queue、Pipe、共享内存 || 创建开销 | 小 | 大 || 数据共享 | 容易,但需要同步 | 困难,需要 IPC || GIL 影响 | 受 GIL 限制 | 不受 GIL 限制 || 适用场景 | I/O 密集型 | CPU 密集型 || 稳定性 | 一个线程崩溃可能影响整个进程 | 进程间隔离,更稳定 |实际应用场景1. I/O 密集型任务 - 使用多线程import threadingimport requestsimport timedef download_url(url): try: response = requests.get(url) print(f"下载完成: {url}, 大小: {len(response.content)} 字节") except Exception as e: print(f"下载失败: {url}, 错误: {e}")urls = [ "https://www.example.com", "https://www.google.com", "https://www.github.com", "https://www.python.org",]start = time.time()threads = [threading.Thread(target=download_url, args=(url,)) for url in urls]for t in threads: t.start()for t in threads: t.join()print(f"总耗时: {time.time() - start:.4f} 秒")2. CPU 密集型任务 - 使用多进程import multiprocessingimport timedef process_image(image_data): # 模拟图像处理 result = sum(i ** 2 for i in range(len(image_data))) return resultdef process_images_concurrently(images): with multiprocessing.Pool(processes=4) as pool: results = pool.map(process_image, images) return resultsif __name__ == '__main__': images = [list(range(10000)) for _ in range(10)] start = time.time() results = process_images_concurrently(images) print(f"多进程处理耗时: {time.time() - start:.4f} 秒")3. 混合使用 - 多进程 + 多线程import multiprocessingimport threadingimport timedef io_task(url): # 模拟 I/O 操作 time.sleep(0.1) return f"Processed {url}"def worker(urls): # 每个进程内部使用多线程处理 I/O threads = [threading.Thread(target=io_task, args=(url,)) for url in urls] for t in threads: t.start() for t in threads: t.join()if __name__ == '__main__': all_urls = [f"url-{i}" for i in range(20)] # 将 URL 分组 url_groups = [all_urls[i:i+5] for i in range(0, len(all_urls), 5)] start = time.time() processes = [multiprocessing.Process(target=worker, args=(group,)) for group in url_groups] for p in processes: p.start() for p in processes: p.join() print(f"混合模式耗时: {time.time() - start:.4f} 秒")最佳实践1. 选择合适的并发模型# I/O 密集型 - 使用多线程import threadingimport requestsdef fetch_url(url): response = requests.get(url) return response.texturls = ["url1", "url2", "url3"]threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]for t in threads: t.start()for t in threads: t.join()# CPU 密集型 - 使用多进程import multiprocessingdef compute-intensive(data): return sum(x ** 2 for x in data)data_chunks = [chunk1, chunk2, chunk3, chunk4]with multiprocessing.Pool(4) as pool: results = pool.map(compute-intensive, data_chunks)2. 避免过度并发# 不好的做法 - 创建过多线程threads = [threading.Thread(target=task) for _ in range(1000)]# 好的做法 - 使用线程池from concurrent.futures import ThreadPoolExecutorwith ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(task) for _ in range(1000)] results = [f.result() for f in futures]3. 正确处理异常import threadingimport tracebackdef worker(): try: # 执行任务 result = do_something() except Exception as e: print(f"线程异常: {e}") traceback.print_exc()threads = [threading.Thread(target=worker) for _ in range(10)]for t in threads: t.start()for t in threads: t.join()4. 使用上下文管理器# 好的做法 - 自动清理资源from concurrent.futures import ThreadPoolExecutordef process_item(item): return item * 2with ThreadPoolExecutor(max_workers=4) as executor: results = list(executor.map(process_item, range(100)))# 线程池自动关闭总结多线程适合 I/O 密集型任务内存占用少,创建开销小受 GIL 限制,无法充分利用多核需要处理线程同步问题多进程适合 CPU 密集型任务可以充分利用多核 CPU内存占用大,创建开销大进程间隔离,更稳定选择建议I/O 密集型:优先使用多线程或异步编程CPU 密集型:使用多进程混合型:结合多进程和多线程简单任务:考虑使用 concurrent.futures 模块理解多线程和多进程的特点,根据实际需求选择合适的并发模型,才能编写出高效、稳定的并发程序。
阅读 0·2月21日 17:10

TCP 流量控制的滑动窗口机制是如何工作的?

TCP 流量控制机制详解TCP 流量控制是防止发送方发送速度过快导致接收方缓冲区溢出的关键机制。滑动窗口机制窗口大小接收窗口(rwnd):接收方通告的可用缓冲区大小拥塞窗口(cwnd):发送方根据网络状况计算的窗口大小实际发送窗口:min(rwnd, cwnd),即取两者中的较小值工作原理接收方通告:在 TCP 报文段的窗口字段中通告当前可用的接收窗口大小发送方调整:发送方根据接收方的窗口大小调整发送速率零窗口:当接收方缓冲区满时,通告窗口大小为 0窗口探测:发送方定期发送零窗口探测报文,查询接收方窗口是否可用流量控制过程正常情况接收方有足够缓冲区时,窗口大小保持较大值发送方可以连续发送多个报文段接收方处理数据后,更新窗口大小并通告发送方缓冲区紧张接收方缓冲区接近满时,减小窗口大小发送方相应减少发送速率避免数据丢失和重传零窗口处理接收方缓冲区满时,通告窗口大小为 0发送方停止发送数据,但保持连接定期发送零窗口探测(1 字节数据)接收方有空间后,更新窗口大小窗口更新的时机接收数据后:处理完数据,释放缓冲区空间应用层读取数据:从接收缓冲区读取数据后零窗口探测响应:响应发送方的探测报文相关问题流量控制和拥塞控制的区别是什么?为什么需要零窗口探测机制?滑动窗口如何影响 TCP 的吞吐量?
阅读 0·2月21日 17:09