首页
小游戏
壁纸
留言
视频
友链
关于
Search
1
上海市第八人民医院核酸检测攻略(时间+预约+报告)-上海
299 阅读
2
上海烟花销售点一览表2022-上海
241 阅读
3
新款的 Thinkbook 16+ 值不值得买?-知乎热搜
219 阅读
4
如何看待网传小米 MIUI 13 内置国家反诈中心 APP?-知乎热搜
214 阅读
5
窦唯到底厉害在哪里?-知乎热搜
192 阅读
免费代理IP
免费翻墙节点
文章聚合
掘金
知乎
IT之家
本地宝
观察者网
金山词霸
搜韵网
新华网
其他
登录
/
注册
Search
标签搜索
知乎热搜
IT之家热榜
广州
深圳
北京
观察者网头条
前端
上海
后端
知乎日报
Android
iOS
人工智能
阅读
工具资源
杭州
诗词日历
每日一句
郑州
设计
看啥
累计撰写
129,720
篇文章
累计收到
46
条评论
首页
栏目
免费代理IP
免费翻墙节点
文章聚合
掘金
知乎
IT之家
本地宝
观察者网
金山词霸
搜韵网
新华网
其他
页面
小游戏
壁纸
留言
视频
友链
关于
搜索到
1666
篇与
的结果
2022-10-20
付费版 VS Code?脑瓜子嗡嗡的吧!-掘金
今天看到一张图,分享给小伙伴看一下:脑瓜子嗡嗡的吧!这不比每周四 v me 50, xx kfc 来的快,还是给你打折限时活动价,由 google 正版授权的微软开源 vscode。并且还应该会有人买吧!可能是为了知识付费。想不到吧!这还上了 github,还有人专门给 vscode github 提了 issue:https://github.com/microsoft/vscode/issues/163798更让我长知识的是,竟然这样做,唯一侵犯的只有商标权?是不是又发现了新的财路。不过下面也有人反驳他,具体对不对,我也不知道。我使用 Mac 搜索 VS Code 前几条搜索结果倒不是广告,难道是 Windows 才会这样么?感兴趣的小伙伴可以去看看。不过说真的,下载软件开发,也就是骗骗小白,不过这钱赚的,有点考验智商!看来反诈 APP 有必要出 PC 版了,看留言区,竟然有人为了装 vscode、java 等,还专门装了 xxx 软件管家。最后,补充一下,下载软件去官网就行,真的不用脑瓜子嗡嗡的!网络不行,找朋友、网友、群友传一个安装包也比 xxx度 靠谱。本文由mdnice多平台发布以上文章来自[掘金]-[程序员小航]本程序使用github开源项目RSSHub提取聚合!
2022年10月20日
0 阅读
0 评论
0 点赞
2022-10-20
好好的系统,为什么要分库分表?-掘金
本文为掘金社区首发签约文章,14天内禁止转载,14天后未获授权禁止转载,侵权必究!大家好,我是小富~说在前边今天是《分库分表 ShardingSphere 原理与实战》系列的开篇文章,之前写过几篇关于分库分表的文章反响都还不错,到现在公众号:程序员小富后台不断的有人留言、咨询分库分表的问题,我也没想到大家对于分库分表的话题会这么感兴趣,可能很多人的工作内容业务量较小很难接触到这方面的技能。这个系列在我脑子里筹划了挺久的,奈何手说啥也不干活,就一直拖到了现在。其实网上关于分库分表相关的文章很多,但我还是坚持出这个系列,主要是自己学习研究,顺便给分享,对于一个知识,不同的人从不同的角度理解的不尽相同。网上的资料看似很多,不过值得学有价值的得仔细挑,很多时候在筛选甄别的过程中,逐渐的磨灭了本就不高的学习热情。搬运抄袭雷同的东西太多,而且知识点又都比较零碎,很少有细致的原理实战案例。对新手来说妥妥的从入门到放弃,即便有成体系的基本上几篇后就断更了(希望我不会吧!)。我不太喜欢堆砌名词概念,熟悉我的朋友不难发现,我的文章从来都是讲完原理紧跟着来一波实战操作。学习技术原理必须配合实操巩固一下,不然三天半不到忘得干干净净,纯纯的经验之谈。上图是我初步罗列的ShardingSphere提纲,在官网文档基础上补充了很多基础知识,这个系列会用几十篇文章,详细的梳理分库分表基础理论,手把手的实战ShardingSphere 5.X框架的功能和解读源码,以及开发中容易踩坑的点,每篇附带代码案例demo,旨在让新手也能看的懂,后续系列完结全部内容会整理成PDF分享给大家,期待一下吧!话不多说,咱们这就进入正题~不急于上手实战ShardingSphere框架,先来复习下分库分表的基础概念,技术名词大多晦涩难懂,不要死记硬背理解最重要,当你捅破那层窗户纸,发现其实它也就那么回事。什么是分库分表分库分表是在海量数据下,由于单库、表数据量过大,导致数据库性能持续下降的问题,演变出的技术方案。分库分表是由分库和分表这两个独立概念组成的,只不过通常分库与分表的操作会同时进行,以至于我们习惯性的将它们合在一起叫做分库分表。通过一定的规则,将原本数据量大的数据库拆分成多个单独的数据库,将原本数据量大的表拆分成若干个数据表,使得单一的库、表性能达到最优的效果(响应速度快),以此提升整体数据库性能。为什么分库分表单机数据库的存储能力、连接数是有限的,它自身就很容易会成为系统的瓶颈。当单表数据量在百万以里时,我们还可以通过添加从库、优化索引提升性能。一旦数据量朝着千万以上趋势增长,再怎么优化数据库,很多操作性能仍下降严重。为了减少数据库的负担,提升数据库响应速度,缩短查询时间,这时候就需要进行分库分表。为什么需要分库?容量我们给数据库实例分配的磁盘容量是固定的,数据量持续的大幅增长,用不了多久单机的容量就会承载不了这么多数据,解决办法简单粗暴,加容量!连接数单机的容量可以随意扩展,但数据库的连接数却是有限的,在高并发场景下多个业务同时对一个数据库操作,很容易将连接数耗尽导致too many connections报错,导致后续数据库无法正常访问。可以通过max_connections查看MySQL最大连接数。show variables like '%max_connections%' 将原本单数据库按不同业务拆分成订单库、物流库、积分库等不仅可以有效分摊数据库读写压力,也提高了系统容错性。为什么需要分表?做过报表业务的同学应该都体验过,一条SQL执行时间超过几十秒的场景。导致数据库查询慢的原因有很多,SQL没命中索引、like扫全表、用了函数计算,这些都可以通过优化手段解决,可唯独数据量大是MySQL无法通过自身优化解决的。慢的根本原因是InnoDB存储引擎,聚簇索引结构的 B+tree 层级变高,磁盘IO变多查询性能变慢,详细原理自行查找一下,这里不用过多篇幅说明。阿里的开发手册中有条建议,单表行数超500万行或者单表容量超过2GB,就推荐分库分表,然而理想和实现总是有差距的,阿里这种体量的公司不差钱当然可以这么用,实际上很多公司单表数据几千万、亿级别仍然不选择分库分表。什么时候分库分表技术群里经常会有小伙伴问,到底什么情况下会用分库分表呢?分库分表要解决的是现存海量数据访问的性能瓶颈,对持续激增的数据量所做出的架构预见性。是否分库分表的关键指标是数据量,我们以fire100.top这个网站的资源表 t_resource为例,系统在运行初始的时候,每天只有可怜的几十个资源上传,这时使用单库、单表的方式足以支持系统的存储,数据量小几乎没什么数据库性能瓶颈。但某天开始一股神秘的流量进入,系统每日产生的资源数据量暴增至十万甚至上百万级别,这时资源表数据量到达千万级,查询响应变得缓慢,数据库的性能瓶颈逐渐显现。以MySQL数据库为例,单表的数据量在达到亿条级别,通过加索引、SQL调优等传统优化策略,性能提升依旧微乎其微时,就可以考虑做分库分表了。既然MySQL存储海量数据时会出现性能瓶颈,那么我们是不是可以考虑用其他方案替代它?比如高性能的非关系型数据库MongoDB?可以,但要看存储的数据类型!现在互联网上大部分公司的核心数据几乎是存储在关系型数据库(MySQL、Oracle等),因为它们有着NoSQL如法比拟的稳定性和可靠性,产品成熟生态系统完善,还有核心的事务功能特性,也是其他存储工具不具备的,而评论、点赞这些非核心数据还是可以考虑用MongoDB的。如何分库分表分库分表的核心就是对数据的分片(Sharding)并相对均匀的路由在不同的库、表中,以及分片后对数据的快速定位与检索结果的整合。分库与分表可以从:垂直(纵向)和 水平(横向)两种纬度进行拆分。下边我们以经典的订单业务举例,看看如何拆分。垂直拆分1、垂直分库垂直分库一般来说按照业务和功能的维度进行拆分,将不同业务数据分别放到不同的数据库中,核心理念 专库专用。按业务类型对数据分离,剥离为多个数据库,像订单、支付、会员、积分相关等表放在对应的订单库、支付库、会员库、积分库。不同业务禁止跨库直连,获取对方业务数据一律通过API接口交互,这也是微服务拆分的一个重要依据。垂直分库很大程度上取决于业务的划分,但有时候业务间的划分并不是那么清晰,比如:电商中订单数据的拆分,其他很多业务都依赖于订单数据,有时候界线不是很好划分。垂直分库把一个库的压力分摊到多个库,提升了一些数据库性能,但并没有解决由于单表数据量过大导致的性能问题,所以就需要配合后边的分表来解决。2、垂直分表垂直分表针对业务上字段比较多的大表进行的,一般是把业务宽表中比较独立的字段,或者不常用的字段拆分到单独的数据表中,是一种大表拆小表的模式。例如:一张t_order订单表上有几十个字段,其中订单金额相关字段计算频繁,为了不影响订单表t_order的性能,就可以把订单金额相关字段拆出来单独维护一个t_order_price_expansion扩展表,这样每张表只存储原表的一部分字段,通过订单号order_no做关联,再将拆分出来的表路由到不同的库中。数据库它是以行为单位将数据加载到内存中,这样拆分以后核心表大多是访问频率较高的字段,而且字段长度也都较短,因而可以加载更多数据到内存中,减少磁盘IO,增加索引查询的命中率,进一步提升数据库性能。水平拆分上边垂直分库、垂直分表后还是会存在单库、表数据量过大的问题,当我们的应用已经无法在细粒度的垂直切分时,依旧存在单库读写、存储性能瓶颈,这时就要配合水平分库、水平分表一起了。1、水平分库水平分库是把同一个表按一定规则拆分到不同的数据库中,每个库可以位于不同的服务器上,以此实现水平扩展,是一种常见的提升数据库性能的方式。例如:db_orde_1、db_order_2两个数据库内有完全相同的t_order表,我们在访问某一笔订单时可以通过对订单的订单编号取模的方式 订单编号 mod 2 (数据库实例数) ,指定该订单应该在哪个数据库中操作。这种方案往往能解决单库存储量及性能瓶颈问题,但由于同一个表被分配在不同的数据库中,数据的访问需要额外的路由工作,因此系统的复杂度也被提升了。2、水平分表水平分表是在同一个数据库内,把一张大数据量的表按一定规则,切分成多个结构完全相同表,而每个表只存原表的一部分数据。例如:一张t_order订单表有900万数据,经过水平拆分出来三个表,t_order_1、t_order_2、t_order_3,每张表存有数据300万,以此类推。水平分表尽管拆分了表,但子表都还是在同一个数据库实例中,只是解决了单一表数据量过大的问题,并没有将拆分后的表分散到不同的机器上,还在竞争同一个物理机的CPU、内存、网络IO等。要想进一步提升性能,就需要将拆分后的表分散到不同的数据库中,达到分布式的效果。数据存在哪个库的表分库分表以后会出现一个问题,一张表会出现在多个数据库里,到底该往哪个库的哪个表里存呢?上边我们多次提到过一定规则 ,其实这个规则它是一种路由算法,决定了一条数据具体应该存在哪个数据库的哪张表里。常见的有 取模算法 、范围限定算法、范围+取模算法 、预定义算法1、取模算法关键字段取模(对hash结果取余数 hash(XXX) mod N),N为数据库实例数或子表数量)是最为常见的一种路由方式。以t_order订单表为例,先给数据库从 0 到 N-1进行编号,对 t_order订单表中order_no订单编号字段进行取模hash(order_no) mod N,得到余数i。i=0存第一个库,i=1存第二个库,i=2存第三个库,以此类推。同一笔订单数据会落在同一个库、表里,查询时用相同的规则,用t_order订单编号作为查询条件,就能快速的定位到数据。优点实现简单,数据分布相对比较均匀,不易出现请求都打到一个库上的情况。缺点取模算法对集群的伸缩支持不太友好,集群中有N个数据库实·hash(user_id) mod N,当某一台机器宕机,本应该落在该数据库的请求就无法得到处理,这时宕掉的实例会被踢出集群。此时机器数减少算法发生变化hash(user_id) mod N-1,同一用户数据落在了在不同数据库中,等这台机器恢复,用user_id作为条件查询用户数据就会少一部分。2、范围限定算法范围限定算法以某些范围字段,如时间或ID区拆分。用户表t_user被拆分成t_user_1、t_user_2、t_user_3三张表,后续将user_id范围为1 ~ 1000w的用户数据放入t_user_1,1000~ 2000w放入t_user_2,2000~3000w放入t_user_3,以此类推。按日期范围划分同理。优点 单表数据量是可控的 水平扩展简单只需增加节点即可,无需对其他分片的数据进行迁移 缺点 由于连续分片可能存在数据热点,比如按时间字段分片时,如果某一段时间(双11等大促)订单骤增,存11月数据的表可能会被频繁的读写,其他分片表存储的历史数据则很少被查询,导致数据倾斜,数据库压力分摊不均匀。 3、范围 + 取模算法为了避免热点数据的问题,我们可以对上范围算法优化一下这次我们先通过范围算法定义每个库的用户表t_user只存1000w数据,第一个db_order_1库存放userId从1 ~ 1000w,第二个库1000~2000w,第三个库2000~3000w,以此类推。每个库里再把用户表t_user拆分成t_user_1、t_user_2、t_user_3等,对userd进行取模路由到对应的表中。有效的避免数据分布不均匀的问题,数据库水平扩展也简单,直接添加实例无需迁移历史数据。4、地理位置分片地理位置分片其实是一个更大的范围,按城市或者地域划分,比如华东、华北数据放在不同的分片库、表。5、预定义算法预定义算法是事先已经明确知道分库和分表的数量,可以直接将某类数据路由到指定库或表中,查询的时候亦是如此。分库分表出来的问题了解了上边分库分表的拆分方式不难发现,相比于拆分前的单库单表,系统的数据存储架构演变到现在已经变得非常复杂。看几个具有代表性的问题,比如:分页、排序、跨节点联合查询分页、排序、联合查询,这些看似普通,开发中使用频率较高的操作,在分库分表后却是让人非常头疼的问题。把分散在不同库中表的数据查询出来,再将所有结果进行汇总合并整理后提供给用户。比如:我们要查询11、12月的订单数据,如果两个月的数据是分散到了不同的数据库实例,则要查询两个数据库相关的数据,在对数据合并排序、分页,过程繁琐复杂。事务一致性分库分表后由于表分布在不同库中,不可避免会带来跨库事务问题。后续会分别以阿里的Seata和MySQL的XA协议实现分布式事务,用来比较各自的优势与不足。全局唯一的主键分库分表后数据库表的主键ID业务意义就不大了,因为无法在标识唯一一条记录,例如:多张表t_order_1、t_order_2的主键ID全部从1开始会重复,此时我们需要主动为一条记录分配一个ID,这个全局唯一的ID就叫分布式ID,发放这个ID的系统通常被叫发号器。多数据库高效治理对多个数据库以及库内大量分片表的高效治理,是非常有必要,因为像某宝这种大厂一次大促下来,订单表可能会被拆分成成千上万个t_order_n表,如果没有高效的管理方案,手动建表、排查问题是一件很恐怖的事。历史数据迁移分库分表架构落地以后,首要的问题就是如何平滑的迁移历史数据,增量数据和全量数据迁移,这又是一个比较麻烦的事情,后边详细讲。分库分表架构模式分库分表架构主要有两种模式:client客户端模式和proxy代理模式客户模式client模式指分库分表的逻辑都在你的系统应用内部进行控制,应用会将拆分后的SQL直连多个数据库进行操作,然后本地进行数据的合并汇总等操作。代理模式proxy代理模式将应用程序与MySQL数据库隔离,业务方的应用不在需要直连数据库,而是连接proxy代理服务,代理服务实现了MySQL的协议,对业务方来说代理服务就是数据库,它会将SQL分发到具体的数据库进行执行,并返回结果。该服务内有分库分表的配置,根据配置自动创建分片表。如何抉择如何选择client模式和proxy模式,我们可以从以下几个方面来简单做下比较。1、性能性能方面client模式表现的稍好一些,它是直接连接MySQL执行命令; proxy代理服务则将整个执行链路延长了,应用->代理服务->MySQL,可能导致性能有一些损耗,但两者差距并不是非常大。2、复杂度client模式在开发使用通常引入一个jar可以; proxy代理模式则需要搭建单独的服务,有一定的维护成本,既然是服务那么就要考虑高可用,毕竟应用的所有SQL都要通过它转发至MySQL。3、升级client模式分库分表一般是依赖基础架构团队的Jar包,一旦有版本升级或者Bug修改,所有应用到的项目都要跟着升级。小规模的团队服务少升级问题不大,如果是大公司服务规模大,且涉及到跨多部门,那么升级一次成本就比较高;proxy模式在升级方面优势很明显,发布新功能或者修复Bug,只要重新部署代理服务集群即可,业务方是无感知的,但要保证发布过程中服务的可用性。4、治理、监控client模式由于是内嵌在应用内,应用集群部署不太方便统一处理;proxy模式在对SQL限流、读写权限控制、监控、告警等服务治理方面更优雅一些。结束语本文主要是回顾一下分库分表的一些基础概念,为大家在后续ShardingSphere实践中更好上手理解,内容里很多概念一笔带过没详细展开,接下来的篇幅会逐一解读。下一篇预告《分库分表ShardingSphere的基础知识点梳理》欢迎关注 公众号:程序员小富,咱们下期再见!以上文章来自[掘金]-[程序员小富]本程序使用github开源项目RSSHub提取聚合!
2022年10月20日
0 阅读
0 评论
0 点赞
2022-10-19
Java Lambda 表达式的各种形态和使用场景,看这篇就够了-掘金
本文为掘金社区首发签约文章,14天内禁止转载,14天后未获授权禁止转载,侵权必究!Lambda 表达式是 Java 8 中添加的功能。引入 Lambda 表达式的主要目的是为了让 Java 支持函数式编程。 Lambda 表达式是一个可以在不属于任何类的情况下创建的函数,并且可以像对象一样被传递和执行。Java lambda 表达式用于实现简单的单方法接口,与 Java Streams API 配合进行函数式编程。在前几篇关于 List、Set 和 Map 的文章中,我们已经看到了这几个 Java 容器很多操作都是通过 Stream 完成的,比如过滤出对象 List 中符合条件的子集时,会使用类似下面的 Stream 操作。List list = aList.filter(a -> a.getId() > 10).collect(Colletors.toList); 其中filter方法里用到的a -> a.getId() > 10就是一个 Lambda 表达式,前面几篇文章我们主要策略讲集合框架里那几个常用 Java 容器的使用,对用到 Lambda 的地方知识简单的说了一下,如果你对各种 Stream 操作有疑问,可以先把本篇 Lambda 相关的内容学完,接下来再仔细梳理 Stream 时就会好理解很多了。本文内容大纲如下: Lambda 表达式和函数式接口上面说了 lambda 表达式便于实现只拥有单一方法的接口,同样在 Java 里匿名类也用于快速实现接口,只不过 lambda 相较于匿名类更方便些,在书写的时候连创建类的步骤也免去了,更适合用在函数式编程。举个例子来说,函数式编程经常用在实现事件 Listener 的时候 。 在 Java 中的事件侦听器通常被定义为具有单个方法的 Java 接口。下面是一个 Listener 接口示例:public interface StateChangeListener { public void onStateChange(State oldState, State newState); } 上面这个 Java 接口定义了一个只要被监听对象的状态发生变化,就会调用的 onStateChange 方法(这里不用管监听的是什么,举例而已)。 在 Java 8 版本以前,监听事件变更的程序必须实现此接口才能侦听状态更改。比如说,有一个名为 StateOwner 的类,它可以注册状态的事件侦听器。public class StateOwner { public void addStateListener(StateChangeListener listener) { ... } } 我们可以使用匿名类实现 StateChangeListener 接口,然后为 StateOwner 实例添加侦听器。StateOwner stateOwner = new StateOwner(); stateOwner.addStateListener(new StateChangeListener() { public void onStateChange(State oldState, State newState) { // do something with the old and new state. System.out.println("State changed") } }); 在 Java 8 引入Lambda 表达式后,我们可以用 Lambda 表达式实现 StateChangeListener 接口会更加方便。现在,把上面例子接口的匿名类实现改为 Lambda 实现,程序会变成这样:StateOwner stateOwner = new StateOwner(); stateOwner.addStateListener( (oldState, newState) -> System.out.println("State changed") ); 在这里,我们使用的 Lambda 表达式是:(oldState, newState) -> System.out.println("State changed") 这个 lambda 表达式与 StateChangeListener 接口的 onStateChange() 方法的参数列表和返回值类型相匹配。如果一个 lambda 表达式匹配单方法接口中方法的参数列表和返回值(比如本例中的 StateChangeListener 接口的 onStateChange 方法),则 lambda 表达式将转换为拥有相同方法签名的接口实现。 这句话听着有点绕,下面详细解释一下 Lambda 表达式和接口匹配的详细规则。匹配Lambda 与接口的规则上面例子里使用的 StateChangeListener 接口有一个特点,其只有一个未实现的抽象方法,在 Java 里这样的接口也叫做函数式接口 (Functional Interface)。将 Java lambda 表达式与接口匹配需要满足一下三个规则: 接口是否只有一个抽象(未实现)方法,即是一个函数式接口? lambda 表达式的参数是否与抽象方法的参数匹配? lambda 表达式的返回类型是否与单个方法的返回类型匹配? 如果能满足这三个条件,那么给定的 lambda 表达式就能与接口成功匹配类型。函数式接口只有一个抽象方法的接口被称为函数是式接口,从 Java 8 开始,Java 接口中可以包含默认方法和静态方法。默认方法和静态方法都有直接在接口声明中定义的实现。这意味着,Java lambda 表达式可以实现拥有多个方法的接口——只要接口中只有一个未实现的抽象方法就行。所以在文章一开头我说lambda 用于实现单方法接口,是为了让大家更好的理解,真实的情况是只要接口中只存在一个抽象方法,那么这个接口就能用 lambda 实现。换句话说,即使接口包含默认方法和静态方法,只要接口只包含一个未实现的抽象方法,它就是函数式接口。比如下面这个接口:import java.io.IOException; import java.io.OutputStream; public interface MyInterface { void printIt(String text); default public void printUtf8To(String text, OutputStream outputStream){ try { outputStream.write(text.getBytes("UTF-8")); } catch (IOException e) { throw new RuntimeException("Error writing String as UTF-8 to OutputStream", e); } } static void printItToSystemOut(String text){ System.out.println(text); } } 即使这个接口包含 3 个方法,它也可以通过 lambda 表达式实现,因为接口中只有一个抽象方法 printIt没有被实现。MyInterface myInterface = (String text) -> { System.out.print(text); }; Lambda VS 匿名类尽管 lambda 表达式和匿名类看起来差不多,但还是有一些值得注意的差异。 主要区别在于,匿名类可以有自己的内部状态--即成员变量,而 lambda 表达式则不能。public interface MyEventConsumer { public void consume(Object event); } 比如上面这个接口,通过匿名类实现MyEventConsumer consumer = new MyEventConsumer() { public void consume(Object event){ System.out.println(event.toString() + " consumed"); } }; MyEventConsumer 接口的匿名类可以有自己的内部状态。MyEventConsumer myEventConsumer = new MyEventConsumer() { private int eventCount = 0; public void consume(Object event) { System.out.println(event.toString() + " consumed " + this.eventCount++ + " times."); } }; 我们给匿名类,加了一个名为 eventCount 的整型成员变量,用来记录匿名类 consume 方法被执行的次数。Lambda 表达式则不能像匿名类一样添加成员变量,所以也成 Lambda 表达式是无状态的。推断 Lamdba 的接口类型使用匿名类实现函数式接口的时候,必须在 new 关键字后指明实现的是哪个接口。比如上面使用过的匿名类例子stateOwner.addStateListener(new StateChangeListener() { public void onStateChange(State oldState, State newState) { // do something with the old and new state. } }); 但是 lambda 表达式,通常可以从上下文中推断出类型。例如,可以从 addStateListener() 方法声明中参数的类型 StateChangeListener 推断出来,Lambda 表达式要实现的是 StateChangeListener 接口。stateOwner.addStateListener( (oldState, newState) -> System.out.println("State changed") ); 通常 lambda 表达式参数的类型也可以推断出来。在上面的示例中,编译器可以从StateChangeListener 接口的抽象方法 onStateChange() 的方法声明中推断出参数 oldState 和 newState 的类型。Lambda 的参数形式由于 lambda 表达式实际上只是个方法,因此 lambda 表达式可以像方法一样接受参数。Lambda 表达式参数根据参数数量以及是否需要添加类型会有下面几个形式。如果表达式的方法不带参数,那么可以像下面这样编写 Lambda 表达式:() -> System.out.println("Zero parameter lambda"); 如果表达式的方法接受一个参数,则可以像下面这样编写 Lambda 表达式:(param) -> System.out.println("One parameter: " + param); 当 Lambda 表达式只接收单个参数时,参数列表外的小括号也可以省略掉。param -> System.out.println("One parameter: " + param); 当 Lambda 表达式接收多个参数时,参数列表的括号就没法省略了。如果编译器无法从 Lambda 匹配的函数式接口的方法声明推断出参数类型(出现这种情况时,编译器会提示),则有时可能需要为 Lambda 表达式的参数指定类型。(Car car) -> System.out.println("The car is: " + car.getName()); Lambda 的方法体lambda 表达式的方法的方法体,在 Lambda 声明中的 -> 右侧指定:(oldState, newState) -> System.out.println("State changed") 如果 Lambda 表达式的方法体需要由多行组成,则需要把多行代码写在用{ }括起来的代码块内。(oldState, newState) -> { System.out.println("Old state: " + oldState); System.out.println("New state: " + newState); } Lamdba 表达式的返回值可以从 Lambda 表达式返回值,就像从方法中返回值一样。只需在 Lambda 的方法体中添加一个 return 语句即可:(param) -> { System.out.println("param: " + param); return "return value"; } 如果 Lambda 表达式所做的只是计算返回值并返回它,我们甚至可以省略 return 语句。(a1, a2) -> { return a1 > a2; } // 上面的可以简写成,不需要return 语句的 (a1, a2) -> { a1 > a2; } Lambda 表达式本质上是一个对象,跟其他任何我们使用过的对象一样, 我们可以将 Lambda 表达式赋值给变量并进行传递和使用。public interface MyComparator { public boolean compare(int a1, int a2); } --- MyComparator myComparator = (a1, a2) -> a1 > a2; boolean result = myComparator.compare(2, 5); 上面的这个例子展示 Lambda 表达式的定义,以及如何将 Lambda 表达式赋值给给变量,最后通过调用它实现的接口方法来调用 Lambda 表达式。外部变量在 Lambda 内的可见性在某些情况下,Lambda 表达式能够访问在 Lambda 函数体之外声明的变量。 Lambda 可以访问以下类型的变量: 局部变量 实例变量 静态变量 **Lambda 内访问局部变量,**Lambda 可以访问在 Lambda 方法体之外声明的局部变量的值public interface MyFactory { public String create(char[] chars); } String myString = "Test"; MyFactory myFactory = (chars) -> { return myString + ":" + new String(chars); }; Lambda 访问实例变量,Lambda 表达式还可以访问创建了 Lambda 的对象中的实例变量。public class EventConsumerImpl { private String name = "MyConsumer"; public void attach(MyEventProducer eventProducer){ eventProducer.listen(e -> { System.out.println(this.name); }); } } 这里实际上也是 Lambda 与匿名类的差别之一。匿名类因为可以有自己的实例变量,这些变量通过 this 引用来引用。但是,Lambda 不能有自己的实例变量,因此 this 始终指向外面包裹 Lambda 的对象。**Lambda 访问静态变量,**Lambda 表达式也可以访问静态变量。这也不奇怪,因为静态变量可以从 Java 应用程序中的任何地方访问,只要静态变量是公共的。public class EventConsumerImpl { private static String someStaticVar = "Some text"; public void attach(MyEventProducer eventProducer){ eventProducer.listen(e -> { System.out.println(someStaticVar); }); } } 把方法引用作为 Lambda如过编写的 lambda 表达式所做的只是使用传递给 Lambda 的参数调用另一个方法,那么 Java里为 Lambda 实现提供了一种更简短的形式来表达方法调用。比如说,下面是一个函数式数接口:public interface MyPrinter{ public void print(String s); } 接下来我们用 Lambda 表达式实现这个 MyPrinter 接口MyPrinter myPrinter = (s) -> { System.out.println(s); }; 因为 Lambda 的参数只有一个,方法体也只包含一行,所以可以简写成MyPrinter myPrinter = s -> System.out.println(s); 又因为 Lambda 方法体内所做的只是将字符串参数转发给 System.out.println() 方法,因此我们可以将上面的 Lambda 声明替换为方法引用。MyPrinter myPrinter = System.out::println; 注意双冒号 :: 向 Java 的编译器指明这是一个方法的引用。引用的方法是双冒号之后的方法。而拥有引用方法的类或对象则位于双冒号之前。我们可以引用以下类型的方法: 静态方法 参数对象的实例方法 实例方法 类的构造方法 引用类的静态方法最容易引用的方法是静态方法,比如有这么一个函数式接口和类public interface Finder { public int find(String s1, String s2); } public class MyClass{ public static int doFind(String s1, String s2){ return s1.lastIndexOf(s2); } } 如果我们创建 Lambda 去调用 MyClass 的静态方法 doFindFinder finder = (s1, s2) -> MyClass.doFind(s1, s2); 所以我们可以使用 Lambda 直接引用 Myclass 的 doFind 方法。Finder finder = MyClass::doFind; 引用参数的方法接下来,如果我们在 Lambda 直接转发调用的方法是来自参数的方法public interface Finder { public int find(String s1, String s2); } Finder finder = (s1, s2) -> s1.indexOf(s2); 依然可以通过 Lambda 直接引用Finder finder = String::indexOf; 这个与上面完全形态的 Lambda 在功能上完全一样,不过要注意简版 Lambda 是如何引用单个方法的。 Java 编译器会尝试将引用的方法与第一个参数的类型匹配,使用第二个参数类型作为引用方法的参数。引用实例方法我们还也可以从 Lambda 定义中引用实例方法。首先,设想有如下接口public interface Deserializer { public int deserialize(String v1); } 该接口表示一个能够将字符串“反序列化”为 int 的组件。现在有一个 StringConvert 类public class StringConverter { public int convertToInt(String v1){ return Integer.valueOf(v1); } } StringConvert 类 的 convertToInt() 方法与 Deserializer 接口的 deserialize() 方法具有相同的签名。因此,我们可以创建 StringConverter 的实例并从 Lambda 表达式中引用其 convertToInt() 方法,如下所示:StringConverter stringConverter = new StringConverter(); Deserializer des = stringConverter::convertToInt; // 等同于 Deserializer des = (value) -> stringConverter.convertToInt(value) 上面第二行代码创建的 Lambda 表达式引用了在第一行创建的 StringConverter 实例的 convertToInt 方法。引用构造方法最后如果 Lambda 的作用是调用一个类的构造方法,那么可以通过 Lambda 直接引用类的构造方法。在 Lambda 引用类构造方法的形式如下:ClassName::new 那么如何将构造方法用作 lambda 表达式呢,假设我们有这样一个函数式接口public interface Factory { public String create(char[] val); } Factory 接口的 create() 方法与 String 类中的其中一个构造方法的签名相匹配(String 类有多个重载版本的构造方法)。因此,String类的该构造方法也可以用作 Lambda 表达式。Factory factory = String::new; // 等同于 Factory factory (chars) -> String.new(chars); 总结今天这篇文章把 Lambda 表达式的知识梳理的了一遍,相信看完了这里的内容,再看到 Lambda 表达式的各种形态就不觉得迷惑了,虽然今天的文章看起来有点枯燥,不过是接下来 咱们系统学习 Stream 操作的基础,以及后面介绍 Java 中提供的几个函数式编程 interface 也会用到 Lambda 里的知识,后面的内容可以继续期待一下。以上文章来自[掘金]-[kevinyan]本程序使用github开源项目RSSHub提取聚合!
2022年10月19日
1 阅读
0 评论
0 点赞
2022-10-19
Tomcat 调优之从 Linux 内核源码层面看 Tcp backlog-掘金
我正在参加「掘金·启航计划」前两天看到一群里在讨论 Tomcat 参数调优,看到不止一个人说通过 accept-count 来配置线程池大小,我笑了笑,看来其实很多人并不太了解我们用的最多的 WebServer Tomcat,这篇文章就来聊下 Tomcat 调优,重点介绍下线程池调优及 TCP 半连接、全连接队列调优。Tomcat 线程池先来说下线程池调优,就拿 SpringBoot 内置的 Tomcat 来说,确实是支持线程池参数配置的,但不是 accept-count 参数,可以通过 threads.max 和 threads.minSpare 来配置线程池最大线程数和核心线程数。如果没有设置,则会使用默认值threads.max: 200 threads.minSpare: 10 Tomcat 底层用到的 ThreadPoolExecutor 也不是 JUC 原生的线程池,而是自定义的,做了一些调整来支持 IO 密集型场景使用,具体介绍可以看之前写的两篇文章。动态线程池(DynamicTp),动态调整 Tomcat、Jetty、Undertow 线程池参数篇以面试官视角万字解读线程池 10 大经典面试题!通过这两篇文章能了解到 Tomcat 自定义线程池的执行流程及原理,然后可以接入动态线程池框架 DynamicTp,将 Tomcat 线程池交由 DynamicTp 管理,使之能享受到动态调参、监控告警的功能。在配置中心配置 tomcat 线程池核心参数spring: dynamic: tp: tomcatTp: corePoolSize: 100 maximumPoolSize: 400 keepAliveTime: 60 Tomcat 线程池调优主要思想就是动态化线程池参数,上线前通过压测初步确定一套较优的参数值,上线后通过监控、告警实时感知线程池负载情况,动态调整参数适应流量的变化。线程池调优就说这些吧,下面主要介绍下 Tcp backlog 及半连接、全连接队列相关内容。划重点 threads.max 和 threads.minSpare 是用来配置 Tomcat 的工作线程池大小的,是线程池维度的参数 accept-count 和 max-connections 是 TCP 维度的配置参数 TCP 状态机Client 端和 Server 端基于 TCP 协议进行通信时,首先需要经过三次握手建连的,通信结束时需要通过四次挥手断连的。注意所谓的连接其实是个逻辑上的概念,并不存在真实连接的,那 TCP 是怎么面向连接传输的呢?TCP 定义了个复杂的有限状态机模型,通信双方通过维护一个连接状态,来达到看起来像有一条连接的效果。如下是 TCP 状态机状态流转图,这个图非常重要,建议大家一定要掌握。图片来自 TCP 状态机 图上半部分描述了三次握手建立连接过程中状态的变化 图下半部分描述了四次挥手断开连接过程中状态的变化 图 2 是通过三次握手建立连接的过程,老八股文了,建议结合图 1 状态机变化图看,图片来源三次握手图 3 是通过四次挥手断开连接的过程,建议结合图 1 状态机变化图看,图片来源四次挥手服务端程序调用 listen() 函数后,TCP 状态机从 CLOSED 转变为 LISTEN,并且 linux 内核会创建维护两个队列。一个是半连接队列(Syn queue),另一个是全连接队列(Accept queue)。建连主要流程如下: 客户端向服务端发送 SYN 包请求建立连接,发送后客户端进入 SYN_SENT 状态 服务端收到客户端的 SYN 请求,将该连接存放到半连接队列(Syn queue)中,并向客户端回复 SYN + ACK,随后服务端进入 SYN_RECV 状态 客户端收到服务端的 SYN + ACK 后,回复服务端 ACK 并进入 ESTABLISHED 状态 服务端收到客户端的 ACK 后,从半连接队列中取出连接放到全连接队列(Accept queue)中,服务端进入 ESTABLISHED 状态 服务端程序调用 accept() 方法,从全连接队列中取出连接进行处理请求 连接队列大小上述提到了半连接队列、全连接队列,这两队列都有大小限制的,超过的连接会被丢掉或者返回 RST 包。半连接队列大小主要受:listen backlog、somaxconn、tcp_max_syn_backlog 这三参数影响全连接队列大小主要受:listen backlog 和 somaxconn 这两参数影响tcp_max_syn_backlog 和 somaxconn 都是 linux 内核参数,在 /proc/sys/net/ipv4/ 和 /proc/sys/net/core/ 下,可以通过 /etc/sysctl.conf 文件来修改,默认值为 128。listen backlog 参数其实就是我们调用 listen 函数时传入的第二个参数。回到主题,Tomcat 的 accept-count 其实最后就会传给 listen 函数做 backlog 用。int listen(int sockfd, int backlog); 可以在配置文件中配置 tomcat accept-count 大小,默认为 100以下代码注释中也注明了 acceptCount 就是 backlog以 Nio2Endpoint 为例看下代码,bind 方法首先会根据配置的核心线程数、最大线程数创建 worker 线程池。然后调用 jdk nio2 中的 AsynchronousServerSocketChannelImpl 的 bind 方法,该方法内会调用 Net.listen() 进行 socket 监听。通过这几段代码,我们可以清晰的看到 Tomcat accept-count = Tcp backlog,默认值为 100。上面说到了半全两个连接队列,至于这两个连接队列大小怎么确定,其实不同 linux 内核版本算法也都不太一样,我们就以 v3.10 来看。以下是 linux 内核 socket.c 中的源码,也就是我们调用 listen() 函数会执行的代码/* * Perform a listen. Basically, we allow the protocol to do anything * necessary for a listen, and if that works, we mark the socket as * ready for listening. */ SYSCALL_DEFINE2(listen, int, fd, int, backlog) { struct socket *sock; int err, fput_needed; int somaxconn; sock = sockfd_lookup_light(fd, &err, &fput_needed); if (sock) { somaxconn = sock_net(sock->sk)->core.sysctl_somaxconn; if ((unsigned int)backlog > somaxconn) backlog = somaxconn; err = security_socket_listen(sock, backlog); if (!err) err = sock->ops->listen(sock, backlog); fput_light(sock->file, fput_needed); } return err; } 可以看到,此处会拿内核参数 somaxconn 和 传入的 backlog 做比较,取二者中的较小者作为全连接队列大小。全连接队列大小 = min(backlog, somaxconn)。接下来 backlog 会依次传递给如下函数,格式约定(源代码文件名#函数名)af_inet.c#inet_listen() -> inet_connection_sock.c#inet_csk_listen_start() -> request_sock.c#reqsk_queue_alloc()reqsk_queue_alloc() 函数代码如下,主要就是用来计算半连接队列大小的。计算逻辑可以简化为下述公式,简单描述 roundup_pow_of_two 算法就是向上取最接近的最大 2 的指数次幂,注意此处 backlog 已经是 min(backlog, somaxconn)半连接队列大小 = roundup_pow_of_two(max(8, min(backlog, tcp_max_syn_backlog))+1)代码里 max_qlen_log 在一个 for 循环里计算,比如算出的半连接队列大小 nr_table_entries = 16 = 2^4,那么 max_qlen_log = 4,该值在判断半连接队列是否溢出时会用到。举个例子,如果 listen backlog = 10、somaxconn = 128、tcp_max_syn_backlog = 128,那么半连接队列大小 = 16,全连接队列大小 = 10。所以要知道,在做连接队列大小调优的时候,一定要综合上述三个参数,只修改某一个起不到想要的效果。连接队列大小查看全连接队列大小可以通过 linux 提供的 ss 命令来查看全连接队列的大小参数说明,参数很多,其他参数可以自己 help 查看说明l:表示显示 listening 状态的 socketn:不解析服务名称t:只显示 tcp sockets这个命令结果怎么解读呢?主要看前三个字段,Recv-Q 和 Send-Q 在 State 为 LISTEN 和非 LISTEN 状态时代表不同的含义。State: LISTENRecv-Q: 全连接队列的当前长度,也就是已经完成三次握手等待服务端调用 accept() 方法获取的连接数量Send-Q: 全连接队列的最大长度,也就是我们上述分析的 backlog 和somaxconn 的最小值State: 非 LISTENRecv-Q: 已接受但未被应用进程读取的字节数Send-Q: 已发送但未收到确认的字节数以上区别从如下内核代码也可以看出,ss 命令就是从 tcp_diag 模块获取的数据半连接队列大小半连接队列没有像 ss 这种命令直接查看,但服务端处于 SYN_RECV 状态的连接都在半连接队列里,所以可以通过如下命令间接统计netstat -natp | grep SYN_RECV | wc -l 半连接队列最大长度可以使用我们上述分析得到的公式计算得到半全连接队列溢出全连接队列溢出当请求量很大,全连接队列比较小时,就有可能发生全连接队列溢出的情况。此代码是 linux 内核用来判断全连接队列是否已满的函数,可以看到判断用的是大于号,这也就是我们用 ss 命令可能会看到 Recv-Q > Send-Q 的原因 sk_ack_backlog 是当前全连接队列的大小 sk_max_ack_backlog 是全连接队列的最大长度,也就是 min(listen_backlog, somaxconn) 当全连接队列满了发生溢出时,会根据 /proc/sys/net/ipv4/tcp_abort_on_overflow 内核参数来决定怎么处理后续的 ack 请求,tcp_abort_on_overflow 默认值为 0。 当 tcp_abort_on_overflow = 0 时,如果全连接队列已满,服务端会直接扔掉客户端发送的 ACK,此时服务端处于 SYN_RECV 状态,客户端处于 ESTABLISHED 状态,服务端的超时重传定时器会重传 SYN + ACK 包给客户端(重传次数由/proc/sys/net/ipv4/tcp_synack_retries 指定,默认值为 5,重试间隔为 1s、2s、4s、8s、16s,共 31s,第 5 次发出后还要等 32s 才知道第 5 次也超时了,所以总共需要 63s)。超过 tcp_synack_retries 后,服务端不会在重传,这时如果客户端发送数据过来,服务端会返回 RST 包,客户端会报 connection reset by peer 异常 当 tcp_abort_on_overflow = 1 时,如果全连接队列已满,服务端收到客户端的 ACK 后,会发送一个 RST 包给客户端,表示结束掉这个握手过程和这个连接,客户端会报 connection reset by peer 异常 一般情况下 tcp_abort_on_overflow 保持默认值 0 就行,能提高建立连接的成功率半连接队列溢出我们知道,服务端收到客户端发送的 SYN 包后会将该连接放入半连接队列中,然后回复 SYN+ACK,如果客户端一直不回复 ACK 做第三次握手,这样就会使得服务端有大量处于 SYN_RECV 状态的 TCP 连接存在半连接队列里,超过设置的队列长度后就会发生溢出。下述代码是 linux 内核判断是否发生半连接队列溢出的函数// 代码在 include/net/inet_connection_sock.h 中 static inline int inet_csk_reqsk_queue_is_full(const struct sock *sk) { return reqsk_queue_is_full(&inet_csk(sk)->icsk_accept_queue); } // 代码在 include/net/request_sock.h 中 static inline int reqsk_queue_is_full(const struct request_sock_queue *queue) { /* * qlen 是当前半连接队列大小 * max_qlen_log 上述解释过,如果半连接队列大小 = 16 = 2^4,那么该值就是4 * 非常巧妙的用了移位运行来判断半连接队列是否溢出,底层满满的都是细节 */ return queue->listen_opt->qlen >> queue->listen_opt->max_qlen_log; } 我们常说的 SYN Flood 洪水攻击 是一种典型的 DDOS 攻击,就是利用了这个点,给服务端发送一个 SYN 包后客户端就下线了,服务端会超时重传 SYN+ACK 包,上述也说了总共需要 63s 才停止重传,也就是说服务端需要经过 63s 后才断开该连接,这样就会导致半连接队列快速被耗尽,不能处理正常的请求。那是怎么防止攻击的呢?linux 提供个一个内核参数 /proc/sys/net/ipv4/tcp_syncookies 来应对该攻击,当半连接队列满了且开启 tcp_syncookies = 1 配置时,服务端在收到 SYN 并返回 SYN+ACK 后,不将该连接放入半连接队列,而是根据这个 SYN 包 TCP 头信息计算出一个 cookie 值。将这个 cookie 作为第二次握手 SYN+ACK 包的初始序列号 seq 发过去,如果是攻击者,就不会有响应,如果是正常连接,客户端回复 ACK 包后,服务端根据头信息计算 cookie,与返回的确认序列号进行比对,如果相同,则是一个正常建立连接。下述代码是计算 cookie 的函数,可以看到跟这些字段有关(源 ip、源端口、目标 ip、目标端口、客户端 syn 包序列号、时间戳、mssind)下面看下第一次握手,收到 SYN 包后服务端的处理代码,代码太多,简化提出跟半连接队列溢出相关代码int tcp_v4_conn_request(struct sock *sk, struct sk_buff *skb) { /* * 如果半连接队列已满,且 tcp_syncookies 未开启,则直接丢弃该连接 */ if (inet_csk_reqsk_queue_is_full(sk) && !isn) { want_cookie = tcp_syn_flood_action(sk, skb, "TCP"); if (!want_cookie) goto drop; } /* * 如果全连接队列已满,并且没有重传 SYN+ACk 包的连接数量大于1,则直接丢弃该连接 * inet_csk_reqsk_queue_young 获取没有重传 SYN+ACk 包的连接数量 */ if (sk_acceptq_is_full(sk) && inet_csk_reqsk_queue_young(sk) > 1) { NET_INC_STATS_BH(sock_net(sk), LINUX_MIB_LISTENOVERFLOWS); goto drop; } // 分配 request sock 内核对象 req = inet_reqsk_alloc(&tcp_request_sock_ops); if (!req) goto drop; if (want_cookie) { // 如果开启了 tcp_syncookies 且半连接队列已满,则计算 cookie isn = cookie_v4_init_sequence(sk, skb, &req->mss); req->cookie_ts = tmp_opt.tstamp_ok; } else if (!isn) { /* 如果没有开启 tcp_syncookies 并且 max_syn_backlog - 半连接队列当前大小 < max_syn_backlog >> 2,则丢弃该连接 */ else if (!sysctl_tcp_syncookies && (sysctl_max_syn_backlog - inet_csk_reqsk_queue_len(sk) < (sysctl_max_syn_backlog >> 2)) && !tcp_peer_is_proven(req, dst, false)) { LIMIT_NETDEBUG(KERN_DEBUG pr_fmt("drop open request from %pI4/%u "), &saddr, ntohs(tcp_hdr(skb)->source)); goto drop_and_release; } isn = tcp_v4_init_sequence(skb); } tcp_rsk(req)->snt_isn = isn; // 构造 syn+ack 响应包 skb_synack = tcp_make_synack(sk, dst, req, fastopen_cookie_present(&valid_foc) ? &valid_foc : NULL); if (likely(!do_fastopen)) { int err; // 发送 syn+ack 响应包 err = ip_build_and_send_pkt(skb_synack, sk, ireq->loc_addr, ireq->rmt_addr, ireq->opt); err = net_xmit_eval(err); if (err || want_cookie) goto drop_and_free; tcp_rsk(req)->snt_synack = tcp_time_stamp; tcp_rsk(req)->listener = NULL; // 添加到半连接队列,并且开启超时重传定时器 inet_csk_reqsk_queue_hash_add(sk, req, TCP_TIMEOUT_INIT); } else if (tcp_v4_conn_req_fastopen(sk, skb, skb_synack, req)) goto drop_and_free; } 查看溢出命令当连接队列溢出时,可以通过 netstart -s 命令查询 # 表示全连接队列溢出的次数,累计值 119005 times the listen queue of a socket overflowed # 表示半连接队列溢出的次数,累计值 119085 SYNs to LISTEN sockets dropped 如果发现这两个值一直在增加,就说明发生了队列溢出,需要看情况调大队列大小常用组件 backlog 大小 Redis 默认 backlog = 511 Nginx 默认 backlog = 511 Mysql 默认 backlog = 50 Undertow 默认 backlog = 1000 Tomcat 默认 backlog = 100 总结这篇文章以 Tomcat 性能调优为切入点,首先简单讲了下 Tomcat 线程池调优。然后借 Tomcat 配置参数 accept-count 引出了 Tcp backlog,从 linux 内核源码层面详细讲解了下 TCP backlog 参数以及半连接、全连接队列的相关知识,包括连接队列大小设置,以及队列溢出怎么排查,这些东西也是我们服务端开发需要掌握的,在性能调优,问题排查时会有一定的帮助。个人开源项目DynamicTp 是一个基于配置中心实现的轻量级动态线程池管理工具,主要功能可以总结为动态调参、通知报警、运行监控、三方包线程池管理等几大类。目前累计 2k star,欢迎大家试用,感谢你的 star,欢迎 pr,业务之余一起给开源贡献一份力量官网:https://dynamictp.cngitee 地址:https://gitee.com/dromara/dynamic-tpgithub 地址:https://github.com/dromara/dynamic-tp以上文章来自[掘金]-[CodeFox]本程序使用github开源项目RSSHub提取聚合!
2022年10月19日
2 阅读
0 评论
0 点赞
2022-10-19
前端要树形数据 我三分钟接口扔给他-掘金
持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第14天,点击查看活动详情前言树状结构是我们在日常工作中经常需要返回的数据结构 一个小小的数状结构也能看出一个开发者的思维方式 我个人最开始写这种树状结构真的是代码老长了 别人一看就会知道我是一个菜鸟 慢慢的自己思考怎么去用最少的代码去搭建一个树状结构 也做一些整理 编码思路真的很重要 思路好代码才会简洁健壮准备实体类我们先定义一个实体类 方便演示 第一种方法的算是和构造器相互依赖Node.class:import lombok.Data; import java.util.ArrayList; import java.util.List; /** * @author变成派大星 */ @Data public class Node { private Integer Id; private String name; private Integer pid; private List treeNode = new ArrayList(); public Node(int id, int pid) { this.Id = id; this.pid = pid; } public Node(int id, int pid, String name) { this(id, pid); this.name = name; } } 思路一我们的树状图一般都是一个动态的结构也就是说我们不能把代码写的太死板 无论树的深度怎么增加我们都不需要更改代码 一般来说 树状结构都有一个核心字段pid 第一个节点 都是为 0 第二个节点的Pid 是第一个节点的标识 我们思路一就是从这边出发 先将第一层节点找出来 剩下的节点通过Pid 进行分组 生成一个Map pid作为Key Node 作为Value 通过循环集合 去Map里面取Key对应的节点Set到自己的子节点下面 去除数据中第一层Pid != 0 的数据4.1 如下图 代码演示 这个需要看一下代码逻辑/** * @author 变成派大星 */ @Service @AllArgsConstructor public class NodeServiceImpl extends ServiceImpl implements NodeService { private NodeMapper nodeMapper; /** * @return 进行树结构的处理操作 */ @Override public List handleTreeVo() { Node first = new Node(1, 0, "first"); Node second = new Node(2, 1, "second"); Node third = new Node(3, 2, "third"); Node second001 = new Node(4, 1, "second001"); Node third001 = new Node(5, 4, "third001"); // 组装树状数据 List nodes = Arrays.asList(first,second,third,second001,third001); return buildTree(nodes); } public List buildTree(List nodes) { //将这些非顶级节点的数据按pid进行分组 这个是根据pid为key 第一步过滤非Pid=0的节点 第二步进行分组 Map nodeMap = nodes.stream().filter(node->node.getPid()!=0) .collect(Collectors.groupingBy(node -> node.getPid())); //循环设置对应的子节点(根据id = pid) 上一步以pid为Key 所以就直接循环获取 nodes.forEach(node -> node.setTreeNode(nodeMap.get(node.getId()))); //过滤第一层不是Pid为零的数据 也就是没有根节点的数据 List treeNode = nodes.stream().filter(node -> node.getPid() == 0).collect(Collectors.toList()); return treeNode; } } 结果小总结上面这种写法 几行代码就能生成一个树状数据 也算一种思路优点的话就是代码简单简洁要说缺点 可能最后一步有点不舒服 也是需要改进的地方 而且这个是专门配合的实体类 真正开发中会比这麻烦点 前端也喜欢后端开发给的树状图字段名都是一样的 所有一般都要写一个Vo返回类 这个配合返回类麻烦程度也会增加思路二因为树状数据比较常用 我们每次都写一个方法肯定比较笨 我们可以利用泛型区写一个工具类 我先把代码写出来然后再解释 这个是可以直接使用的 是比较通用的import cn.hutool.core.collection.CollUtil; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.function.BiConsumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; /** * @author 变成派大星 */ public class TreeUtils { /** * @param list 源数据 * @param setChildListFn 设置递归的方法 * @param idFn 获取id的方法 * @param pidFn 获取父id的方法 * @param getRootCondition 获取根节点的提哦啊见 * @return 将List 转换成 Tree */ public static List listToTree(List list, Function idFn, Function pidFn, BiConsumer setChildListFn, Predicate getRootCondition) { if (CollUtil.isEmpty(list)) return null; Map listMap = list.stream().collect(Collectors.groupingBy(pidFn)); list.forEach(model -> setChildListFn.accept(model, listMap.get(idFn.apply(model)))); return list.stream().filter(getRootCondition).collect(Collectors.toList()); } public static List treeToList(List source, Function getChildListFn, BiConsumer setChildListFn, Predicate getRootCondition) { List target = new ArrayList(); if (CollectionUtils.isNotEmpty(source)) { treeToList(source, target, getChildListFn); target.forEach(model -> setChildListFn.accept(model, null)); target.addAll(target.stream().filter(getRootCondition).collect(Collectors.toList())); } return target; } private static void treeToList(List source, List target, Function getChildListFn) { if (CollectionUtils.isNotEmpty(source)) { target.addAll(source); source.forEach(model -> { List childList = getChildListFn.apply(model); if (CollectionUtils.isNotEmpty(childList)) { treeToList(childList, target, getChildListFn); } }); } } } 方法一:listToTree就是将你的数据从list转换成Tree结构public List handleTree(){ Node first = new Node(1, 0, "first"); Node second = new Node(2, 1, "second"); Node third = new Node(3, 2, "third"); Node second001 = new Node(4, 1, "second001"); Node third001 = new Node(5, 4, "third001"); List nodes = Arrays.asList(first,second,third,second001,third001); // 只需要一行数据 return TreeUtils.listToTree(nodes, Node::getId, Node::getPid, Node::setTreeNode, (l) -> l.getPid() == 0); } 结果这个其实非常方便 基本上一分钟就能转换了 再也不会被前端催了 也可以自己去改写 按照自己的需求去调整 思路很像 只不过是去优化方法二 Tree 转 Listpublic List handleTree(){ Node first = new Node(1, 0, "first"); Node second = new Node(2, 1, "second"); Node third = new Node(3, 2, "third"); Node second001 = new Node(4, 1, "second001"); Node third001 = new Node(5, 4, "third001"); List nodes = Arrays.asList(first,second,third,second001,third001); List nodeList = TreeUtils.listToTree(nodes, Node::getId, Node::getPid, Node::setTreeNode, (l) -> l.getPid() == 0); // 树状结构转换成 List 也就是还原数据 return TreeUtils.treeToList(nodeList, Node::getTreeNode, Node::setTreeNode, (l) -> l.getPid() == 0); } 可以去反转树结构 用处的看自己的业务需求 可以看一下思路总结也许有很多类去实现树结构 只需要很少的代码 但是有些思路还是要学的 也有其他暴力组成 但是基本思路差不多 如果各位大佬有更好的思路 可以提醒一下 祝顺利以上文章来自[掘金]-[变成派大星]本程序使用github开源项目RSSHub提取聚合!
2022年10月19日
0 阅读
0 评论
0 点赞
2022-10-18
(七)MySQL事务篇:ACID原则、事务隔离级别及事务机制原理剖析-掘金
本文为掘金社区首发签约文章,14天内禁止转载,14天后未获授权禁止转载,侵权必究! 众所周知,MySQL数据库的核心功能就是存储数据,通常是整个业务系统中最重要的一层,可谓是整个系统的“大本营”,因此只要MySQL存在些许隐患问题,对于整个系统而言都是致命的。那此刻不妨思考一个问题:MySQL在接受外部数据写入时,有没有可能会发生问题呢?有人也许会笑着回答:“那怎么可能啊,MySQL在写入数据时怎么会存在问题呢”。 的确,MySQL本身在写入数据时并不会有问题,就算部署MySQL的机器断电/宕机,其内部也有一套健全的机制确保数据不丢失。但往往风险并不来自于表象,虽然MySQL写入数据没问题,但结合业务来看就会有一个很大的隐患,此话怎讲呐?先看案例:-- 从库存表中扣减商品数量 UPDATE `zz_inventory` SET ......; -- 向订单表、订单详情表中插入订单记录 INSERT INTO `zz_order` VALUES(....); INSERT INTO `zz_order_info` VALUES(....); -- 向物流表中插入相应的物流信息 INSERT INTO `zz_logistics` VALUES(....); 上述的伪SQL中,描述的是一个经典下单业务,先扣库存数量、再增加订单记录、再插入物流信息,按照正常的逻辑来看,上面的SQL也没有问题。但是请仔细想想!实际的项目中,这三组SQL是会由客户端(Java线程)一条条发过来的,假设执行到「增加订单记录」时,Java程序那边抛出了异常,会出现什么问题呢?乍一想似乎没问题,但仔细一想:Java线程执行时出现异常会导致线程执行中断。因为Java线程中断了,所以线程不会再向数据库发送「增加订单详情记录、插入物流信息」的SQL,此刻再来想想这个场景,由于增加订单详情和物流信息的SQL都未发送过来,因此必然也不会执行,但此时库存已经扣了,用户钱也付了,但却没有订单和物流信息,这引发的后果估计老板都能杀个程序员祭天了......其实上面列举的这个案例,在数据库中被称之为事务问题,接下来一起聊一聊。一、事务的ACID原则 什么是事务呢?事务通常是由一个或一组SQL组成的,组成一个事务的SQL一般都是一个业务操作,例如前面聊到的下单:「扣库存数量、增加订单详情记录、插入物流信息」,这一组SQL就可以组成一个事务。而数据库的事务一般也要求满足ACID原则,ACID是关系型数据库实现事务机制时必须要遵守的原则。ACID主要涵盖四条原则,即: A/Atomicity:原子性 C/Consistency:一致性 I/Isolation:独立性/隔离性 D/Durability:持久性 那这四条原则分别是什么意思呢?接下来一起聊一聊。1.1、Atomicity原子性 原子性这个概念,在之前《并发编程系列-JMM内存模型》时曾初次提到过,而在MySQL中原子性的含义也大致相同,指组成一个事务的一组SQL要么全部执行成功,要么全部执行失败,事务中的一组SQL会被看成一个不可分割的整体,当成一个操作看待。好比事务A由①、②、③条SQL组成,那这一个事务中的三条SQL必须全部执行成功,只要其中任意一条执行失败,例如②执行时出现异常了,此时就会导致事务A中的所有操作全部失败。1.2、Consistency一致性 一致性也比较好理解,也就是不管事务发生的前后,MySQL中原本的数据变化都是一致的,也就是DB中的数据只允许从一个一致性状态变化为另一个一致性状态。这句话似乎听起来有些绕,不太好理解对嘛?简单解释一下就是:一个事务中的所有操作,要么一起改变数据库中的数据,要么都不改变,对于其他事务而言,数据的变化是一致的,上栗子:假设此时有一个事务A,这个事务隶属于一个下单操作,由「⓵扣库存数量、⓶增加订单详情记录、⓷插入物流信息」三这条SQL操作组成。一致性的含义是指:在这个事务执行前,数据库中的数据是处于一致性状态的,而SQL执行完成之后事务提交,数据库中的数据依旧处于一个“一致性”状态,也就是库存数量+订单数量永远是等于最初的库存总数的,比如原本的总库存是10000个,此时库存剩余8888个,那也就代表着必须要有1112条订单数据才行。这也就是前面说的:“事务发生的前后,MySQL中原本的数据变化都是一致的”,这句话的含义,不可能库存减了,但订单没有增加,这样就会导致数据库整体数据出现不一致。如果出现库存减了,但订单没有增加的情况,就代表着事务执行过程中出现了异常,此时MySQL就会利用事务回滚机制,将之前减的库存再加回去,确保数据的一致性。但来思考一个问题,如果事务执行过程中,刚减完库存后,MySQL所在的服务器断电了咋整?似乎无法利用事务回滚机制去确保数据一致性了撒?对于这点大可不必担心,因为MySQL宕机重启后,会通过分析日志的方式恢复数据,确保一致性(对于这点稍后再细聊)。1.3、Isolation独立性/隔离性 简单理解原子性和一致性后,再来看看ACID中的隔离性,在有些地方也称之为独立性,意思就是指多个事务之间都是独立的,相当于每个事务都被装在一个箱子中,每个箱子之间都是隔开的,相互之间并不影响,同样上个栗子:假设数据库的库存表中,库存数量剩余8888个,此时有A、B两个并发事务,这两个事务都是相同的下单操作,由「⓵扣库存数量、增⓶加订单详情记录、⓷插入物流信息」三这条SQL操作组成。此时A、B两个事务一起执行,同一时刻执行减库存的SQL,因此这里是并发执行的,那两个事务之间是否会互相影响,导致扣的是同一个库存呢?答案是不会,ACID原则中的隔离性保障了并发事务的顺序执行,一个未完成事务不会影响另外一个未完成事务。隔离性在底层是如何实现的呢?基于MySQL的锁机制和MVCC机制做到的(后续《MySQL事务与锁原理篇》再详细去讲)。1.4、Durability持久性 相较于之前的原子性、一致性、隔离性来说,持久性是ACID原则中最容易理解的一条,持久性是指一个事务一旦被提交,它会保持永久性,所更改的数据都会被写入到磁盘做持久化处理,就算MySQL宕机也不会影响数据改变,因为宕机后也可以通过日志恢复数据。也就相当于你许下一个诺言之后,那你无论遇到什么情况都会保证做到,就算遇到山水洪灾、地球毁灭、宇宙爆炸.....任何情况也好,你都会保证完成你的诺言为止。二、MySQL的事务机制综述 刚刚说到的ACID原则是数据库事务的四个特性,也可以理解为实现事务的基础理论,那接下来一起看看MySQL所提供的事务机制。在MySQL默认情况下,一条SQL会被视为一个单独的事务,同时也无需咱们手动提交,因为默认是开启事务自动提交机制的,如若你想要将多条SQL组成一个事务执行,那需要显式的通过一些事务指令来实现。2.1、手动管理事务在MySQL中,提供了一系列事务相关的命令,如下: start transaction | begin | begin work:开启一个事务 commit:提交一个事务 rollback:回滚一个事务 当需要使用事务时,可以先通过start transaction命令开启一个事务,如下:-- 开启一个事务 start transaction; -- 第一条SQL语句 -- 第二条SQL语句 -- 第三条SQL语句 -- 提交或回滚事务 commit || rollback; 对于上述MySQL手动开启事务的方式,相信大家都不陌生,但大家有一点应该会存在些许疑惑:事务是基于当前数据库连接而言的,而不是基于表,一个事务可以由操作不同表的多条SQL组成,这句话什么意思呢?看下图:上面画出了两个数据库连接,假设连接A中开启了一个事务,那后续过来的所有SQL都会被加入到一个事务中,也就是图中连接A,后面的SQL②、SQL③、SQL④、SQL⑤这五条都会被加入到一个事务中,只要在未曾收到commit/rollback命令之前,这个连接来的所有SQL都会加入到同一个事务中,因此对于这点要牢记,开启事务后一定要做提交或回滚处理。不过在连接A中开启事务,是不会影响连接B的,这也是我说的:事务是基于当前数据库连接的,每个连接之间的事务是具备隔离性的,比如上个真实栗子~此时先打开两个cmd命令行,然后用命令连接MySQL,或者也可以用Navicat、SQLyog等数据库可视化工具,新建两个查询,如下:这里插个小偏门知识:当你在Navicat、SQLyog这类可视化工具中,新建一个查询时,本质上它就是给你建立了一个数据库连接,每一个新查询都是一个新的连接。然后开始在两个查询中编写对应的SQL命令,先在查询窗口①中开启一个事务:-- 先查询一次表数据 SELECT * FROM `zz_users`; +---------+-----------+----------+----------+---------------------+ | user_id | user_name | user_sex | password | register_time | +---------+-----------+----------+----------+---------------------+ | 1 | 熊猫 | 女 | 6666 | 2022-08-14 15:22:01 | | 2 | 竹子 | 男 | 1234 | 2022-09-14 16:17:44 | | 3 | 子竹 | 男 | 4321 | 2022-09-16 07:42:21 | | 4 | 1111 | 男 | 8888 | 2022-09-17 23:48:29 | +---------+-----------+----------+----------+---------------------+ -- 开启事务 start transaction; -- 修改 ID=4 的姓名为:黑熊 update `zz_users` set `user_name` = "黑熊" where `user_id` = 4; -- 删除 ID=1 的行数据 delete from `zz_users` where `user_id` = 1; -- 再次查询一次数据 SELECT * FROM `zz_users`; +---------+-----------+----------+----------+---------------------+ | user_id | user_name | user_sex | password | register_time | +---------+-----------+----------+----------+---------------------+ | 2 | 竹子 | 男 | 1234 | 2022-09-14 16:17:44 | | 3 | 子竹 | 男 | 4321 | 2022-09-16 07:42:21 | | 4 | 黑熊 | 男 | 8888 | 2022-09-17 23:48:29 | +---------+-----------+----------+----------+---------------------+ 观察上面的结果,对比开启事务前后的的表数据查询,在事务中分别修改、删除一条数据后,再次查询表数据时会观察到表数据已经变化,此时再去查询窗口②中查询表数据:SELECT * FROM `zz_users`; +---------+-----------+----------+----------+---------------------+ | user_id | user_name | user_sex | password | register_time | +---------+-----------+----------+----------+---------------------+ | 1 | 熊猫 | 女 | 6666 | 2022-08-14 15:22:01 | | 2 | 竹子 | 男 | 1234 | 2022-09-14 16:17:44 | | 3 | 子竹 | 男 | 4321 | 2022-09-16 07:42:21 | | 4 | 1111 | 男 | 8888 | 2022-09-17 23:48:29 | +---------+-----------+----------+----------+---------------------+ 在查询窗口②中,也就相当于在第二个连接中查询数据时,会发现第一个连接(窗口①)改变的数据并未影响到第二个连接,啥原因呢?这是因为窗口①中还未提交事务,所以第一个连接改变的数据不会影响第二个连接。其实具体的原因是由于MySQL事务的隔离机制造成的,但对于这点后续再去分析。此时在查询窗口①中,输入rollback命令,让当前事务回滚:-- 回滚当前连接中的事务 rollback; -- 再次查询表数据 SELECT * FROM `zz_users`; +---------+-----------+----------+----------+---------------------+ | user_id | user_name | user_sex | password | register_time | +---------+-----------+----------+----------+---------------------+ | 1 | 熊猫 | 女 | 6666 | 2022-08-14 15:22:01 | | 2 | 竹子 | 男 | 1234 | 2022-09-14 16:17:44 | | 3 | 子竹 | 男 | 4321 | 2022-09-16 07:42:21 | | 4 | 1111 | 男 | 8888 | 2022-09-17 23:48:29 | +---------+-----------+----------+----------+---------------------+ 结果很明显,当事务回滚后,之前所做的数据更改操作全部都会撤销,恢复到事务开启前的表数据。当然,如果不手动开启事务,执行下述这条SQL会发生什么情况呢?update `zz_users` set `user_name` = "黑熊" where `user_id` = 4; 会直接修改表数据,并且其他连接可见,因为MySQL默认将一条SQL视为单个事务,同时默认开启自动提交事务,也就是上面这条SQL执行完了之后就会自动提交。-- 查看 自动提交事务 是否开启 SHOW VARIABLES LIKE 'autocommit'; -- 关闭或开启自动提交 SET autocommit = 0|1|ON|OFF; 上述的[0/ON]是相同的意思,表示开启自动提交,[1/OFF]则表示关闭自动提交。2.2、事务回滚点 在上面简单阐述了事务的基本使用,但假设目前有一个事务,由很多条SQL组成,但是我想让其中一部分执行成功后,就算后续SQL执行失败也照样提交,这样可以做到吗?从前面的理论上来看,一个事务要么全部执行成功,要么全部执行失败,似乎做不到啊,但实际上是可以做到的,这里需要利用事务的回滚点机制。在某些SQL执行成功后,但后续的操作有可能成功也有可能失败,但不管成功亦或失败,你都想让前面已经成功的操作生效时,此时就可在当前成功的位置设置一个回滚点。当后续操作执行失败时,就会回滚到该位置,而不是回滚整个事务中的所有操作,这个机制则称之为事务回滚点。在MySQL中提供了两个关于事务回滚点的命令: savepoint point_name:添加一个事务回滚点 rollback to point_name:回滚到指定的事务回滚点 以前面的案例来演示效果,如下:-- 先查询一次用户表 SELECT * FROM `zz_users`; -- 开启事务 start transaction; -- 修改 ID=4 的姓名为:黑熊 update `zz_users` set `user_name` = "黑熊" where `user_id` = 4; -- 添加一个事务回滚点:update_name savepoint update_name; -- 删除 ID=1 的行数据 delete from `zz_users` where `user_id` = 1; -- 回滚到 update_name 这个事务点 rollback to update_name; -- 再次查询一次数据 SELECT * FROM `zz_users`; -- 提交事务 COMMIT; 上述代码中开启了一个事务,事务中总共修改和删除两条SQL组成,然后在修改语句后面添加了一个事务回滚点update_name,在删除语句后回滚到了前面添加的回滚点。但要注意:回滚到事务点后不代表着事务结束了,只是事务内发生了一次回滚,如果要结束当前这个事务,还依旧需要通过commit|rollback;命令处理。其实借助事务回滚点,可以很好的实现失败重试,比如对事务中的每个SQL添加一个回滚点,当执行一条SQL时失败了,就回滚到上一条SQL的事务点,接着再次执行失败的SQL,反复执行到所有SQL成功为止,最后再提交整个事务。当然,这个只是理论上的假设,实际业务中不要这么干~2.3、MySQL事务的隔离机制 OK~,在前面做的小测试中,咱们会发现不同的数据库连接中,一个连接的事务并不会影响其他连接,当时也稍微的提过一嘴:这是基于事务隔离机制实现的,那接下来重点聊一聊MySQL的事务隔离机制。其实在MySQL中,事务隔离机制分为了四个级别: ①Read uncommitted/RU:读未提交 ②Read committed/RC:读已提交 ③Repeatable read/RR:可重复读 ④Serializable:序列化/串行化 上述四个级别,越靠后并发控制度越高,也就是在多线程并发操作的情况下,出现问题的几率越小,但对应的也性能越差,MySQL的事务隔离级别,默认为第三级别:Repeatable read可重复读,但如若想要真正理解这几个隔离级别,得先明白几个因为并发操作造成的问题。2.3.1、脏读、幻读、不可重复读问题数据库的脏读问题首先来看看脏读,脏读的意思是指一个事务读到了其他事务还未提交的数据,也就是当前事务读到的数据,由于还未提交,因此有可能会回滚,如下:比如上图中,DB连接①/事务A正在执行下单业务,目前扣减库存、增加订单两条SQL已经完成了,恰巧此时DB连接②/事务B跑过来读取了一下库存剩余数量,就将事务A已经扣减之后的库存数量读回去了。但好巧不巧,事务A在添加物流信息时,执行异常导致事务A全部回滚,也就是原本扣的库存又会增加回去。在个案例中,事务A先扣减了库存,然后事务回滚时又加了回去,但连接②已经将扣减后的库存数量读回去操作了,这个过程就被称为数据库脏读问题。这个问题很严重,会导致整个业务系统出现问题,数据最终错乱。数据库的不可重复读问题再来看看不可重复读问题,不可重复读问题是指在一个事务中,多次读取同一数据,先后读取到的数据不一致,如下:你没看错,就是对前面那张图稍微做了一点改造,事务A执行下单业务时,因为添加物流信息的时候出错了,导致整个事务回滚,事务回滚完成后,事务A就结束了。但事务B却并未结束,在事务B中,在事务A执行时读取了一次剩余库存,然后在事务回滚后又读取了一次剩余库存,仔细想想:B事务第一次读到的剩余库存是扣减之后的,第二次读到的剩余库存则是扣减之前的(因为A事务回滚又加回去了)。在上述这个案例中,同一个事务中读取同一数据,结果却并不一致,也就说明了该数据存在不可重复读问题,这样说似乎有些绕,那再结合可重复读来一起理解:可重复读的意思是:在同一事务中,不管读取多少次,读到的数据都是相同的。结合上述可重复读的定义,再去理解不可重复读问题会容易很多,重点是理解可重复、不可重复这个词义,为了更形象化一点,举个生活中的案例:一张卫生纸,我先拿去擦了一下桌子上的污水渍,然后又放回了原位,当我想上厕所再次拿起时,它已经无法使用了,这就代表着一张卫生纸是不可重复使用的。一个大铁锤,我先拿去敲一下松掉的桌腿,然后放回了原位,当我又想敲一下墙上的钉子再次拿起时,这个大铁锤是没有发生任何变化的,可以再次用来敲钉子,这就代表大铁锤是可以重复使用的。相信结合这两个栗子,更能让你明白可重复与不可重复的概念定义。数据库的幻读问题对于幻读的解释在网上也有很多资料,但大部分资料是这样描述幻读问题的:幻读:指同一个事务内多次查询返回的结果集不一样。比如同一个事务A,在第一次查询表的数据行数时,发现表中有n条行记录,但是第二次以同等条件查询时,却发现有n+1条记录,这就好像产生了幻觉。这个说法实际上并不严谨,第一次读和第二次读同一数据,结果集并不相同,这其实属于一个不可重复读的问题,而并非幻读问题。那接下来举例说明一下什么叫做真正的幻读问题,先上图:做过电商业务的小伙伴都清楚,一般用户购买商品后付的钱会先冻结在平台上,然后由平台在固定的时间内结算用户款,例如七天一结算、半月一结算等方式,在结算业务中通常都会涉及到核销处理,也就是将所有为「已签收状态」的订单改为「已核销状态」。此时假设连接①/事务A正在执行「半月结算」这个工作,那首先会读取订单表中所有状态为「已签收」的订单,并将其更改为「已核销」状态,然后将用户款打给商家。但此时恰巧,某个用户的订单正好到了自动确认收货的时间,因此在事务A刚刚改完表中订单的状态时,事务B又向表中插入了一条「已签收状态」的订单并提交了,当事务A完成打款后,再次查询订单表,结果会发现表中还有一条「已签收状态」的订单数据未结算,这就好像产生了幻觉一样,这才是真正的幻读问题。当然,这样讲似乎还不是那么令人理解,再举个更通俗易懂的栗子,假设此时平台要升级,用户表中的性别字段,原本是以「男、女」的形式保存数据,现在平台升级后要求改为「0、1」代替。因此事务A开始更改表中所有数据的性别字段,当负责执行事务A的线程正在更改最后一条表数据时,此时事务B来了,正好向用户表中插入了一条「性别=男」的数据并提交了,然后事务A改完原本的最后一条数据后,当再次去查询用户表时,结果会发现表中依旧还存在一条「性别=男」的数据,似乎又跟产生了幻觉一样。经过上述这两个案例,大家应该能够理解真正的幻读问题,发生幻读问题的原因是在于:另外一个事务在第一个事务要处理的目标数据范围之内新增了数据,然后先于第一个事务提交造成的问题。数据库脏写问题其实除开三个读的问题外,还有有一个叫做脏写的问题,也就是多个事务一起操作同一条数据,例如两个事务同时向表中添加一条ID=88的数据,此时就会造成数据覆盖,或者主键冲突的问题,这个问题也被称之为更新丢失问题。2.3.2、事务的四大隔离级别在上面连续讲了脏读、不可重复读以及幻读三个问题,那这些问题该怎么解决呢?其实四个事务隔离级别,解决的实际问题就是这三个,因此一起来看看各级别分别解决了什么问题: ①读未提交:处于该隔离级别的数据库,脏读、不可重复读、幻读问题都有可能发生。 ②读已提交:处于该隔离级别的数据库,解决了脏读问题,不可重复读、幻读问题依旧存在。 ③可重复读:处于该隔离级别的数据库,解决了脏读、不可重复读问题,幻读问题依旧存在。 ④序列化/串行化:处于该隔离级别的数据库,解决了脏读、不可重复读、幻读问题都不存在。 前面提到过,MySQL默认是处于第三级别的,可以通过如下命令查看目前数据库的隔离级别:-- 查询方式① SELECT @@tx_isolation; -- 查询方式② show variables like '%tx_isolation%'; +---------------+-----------------+ | Variable_name | Value | +---------------+-----------------+ | tx_isolation | REPEATABLE-READ | +---------------+-----------------+ 其实数据库不同的事务隔离级别,是基于不同类型、不同粒度的锁实现的,因此想要真正搞懂隔离机制,还需要弄明白MySQL的锁机制,事务与锁机制二者之间本身就是相辅相成的关系,锁就是为了解决并发事务的一些问题而存在的,但对于锁的内容在后续的《MySQL锁篇》再细聊,这里就简单概述一下。这里先说明一点,事务是基于数据库连接的,数据库连接在《MySQL架构篇》中曾说过:数据库连接本身会有一条工作线程来维护,也就是说事务的执行本质上就是工作线程在执行,因此所谓的并发事务也就是指多条线程并发执行。多线程其实是咱们的老朋友了,在之前的《并发编程系列》中,几乎将多线程的底裤都翻出来了,因此结合多线程角度来看,脏读、不可重复读、幻读这一系列问题,本质上就是一些线程安全问题,因此需要通过锁来解决,而根据锁的粒度、类型,又分出了不同的事务隔离级别。读未提交级别这种隔离级别是基于「写互斥锁」实现的,当一个事务开始写某一个数据时,另外一个事务也来操作同一个数据,此时为了防止出现问题则需要先获取锁资源,只有获取到锁的事务,才允许对数据进行写操作,同时获取到锁的事务具备排他性/互斥性,也就是其他线程无法再操作这个数据。但虽然这个级别中,写同一数据时会互斥,但读操作却并不是互斥的,也就是当一个事务在写某个数据时,就算没有提交事务,其他事务来读取该数据时,也可以读到未提交的数据,因此就会导致脏读、不可重复读、幻读一系列问题出现。但是由于在这个隔离级别中加了「写互斥锁」,因此不会存在多个事务同时操作同一数据的情况,因此这个级别中解决了前面说到的脏写问题。读已提交级别在这个隔离级别中,对于写操作同样会使用「写互斥锁」,也就是两个事务操作同一事务时,会出现排他性,而对于读操作则使用了一种名为MVCC多版本并发控制的技术处理,也就是有事务中的SQL需要读取当前事务正在操作的数据时,MVCC机制不会让另一个事务读取正在修改的数据,而是读取上一次提交的数据(也就是读原本的老数据)。也就是在这个隔离级别中,基于同一条数据而言,对于写操作会具备排他性,对于读操作则只能读已提交事务的数据,不会读取正在操作但还未提交的事务数据,为了理解还是简单的说一下其过程,同样有两个事务A、B。事务A的主要工作是负责更新ID=1的这条数据,事务B中则是读取ID=1的这条数据。 此时当A正在更新数据但还未提交时,事务B开始读取数据,此时MVCC机制则会基于表数据的快照创建一个ReadView,然后读取原本表中上一次提交的老数据。然后等事务A提交之后,事务B再次读取数据,此时MVCC机制又会创建一个新的ReadView,然后读取到最新的已提交的数据,此时事务B中两次读到的数据并不一致,因此出现了不可重复读问题。当然,对于MVCC机制以及锁机制这里暂时先不展开叙述,后续会开单章讲解。可重复读级别在这个隔离级别中,主要就是解决上一个级别中遗留的不可重复读问题,但MySQL依旧是利用MVCC机制来解决这个问题的,只不过在这个级别的MVCC机制会稍微有些不同。在读已提交级别中,一个事务中每次查询数据时,都会创建一个新的ReadView,然后读取最近已提交的事务数据,因此就会造成不可重复读的问题。而在可重复读级别中,则不会每次查询时都创建新的ReadView,而是在一个事务中,只有第一次执行查询会创建一个ReadView,在这个事务的生命周期内,所有的查询都会从这一个ReadView中读取数据,从而确保了一个事务中多次读取相同数据是一致的,也就是解决了不可重复读问题。虽然在这个隔离级别中,解决了不可重复读问题,但依旧存在幻读问题,也就是事务A在对表中多行数据进行修改,比如前面的举例,将性别「男、女」改为「0、1」,此时事务B又插入了一条性别为男的数据,当事务A提交后,再次查询表时,会发现表中依旧存在一条性别为男的数据。序列化/串行化级别这个隔离级别是最高的级别,处于该隔离级别的MySQL绝不会产生任何问题,因为从它的名字上就可以得知:序列化意思是将所有的事务按序排队后串行化处理,也就是操作同一张表的事务只能一个一个执行,事务在执行前需要先获取表级别的锁资源,拿到锁资源的事务才能执行,其余事务则陷入阻塞,等待当前事务释放锁。但这种隔离级别会导致数据库的性能直线下降,毕竟相当于一张表上只能允许单条线程执行了,虽然安全等级最高,可以解决脏写、脏读、不可重复读、幻读等一系列问题,但也是代价最高的,一般线上很少使用。这种隔离级别解决问题的思想很简单,之前我们分析过,产生一系列问题的根本原因在于:多事务/多线程并发执行导致的,那在这个隔离级别中,直接将多线程化为了单线程,自然也就从根源上避免了问题产生。是不是非常“银杏花”,虽然我解决不了问题,但我可以直接解决制造问题的人。略微提一嘴:其实在RR级别中也可以解决幻读问题,就是使用临键锁(间隙锁+行锁)这种方式来加锁,但具体的还是放在《MySQL锁篇》详细阐述。2.3.3、事务隔离机制的命令简单认识MySQL事务隔离机制后,接着来看看一些关于事务隔离机制的命令:-- 方式①:查询当前数据库的隔离级别 SELECT @@tx_isolation; -- 方式②:查询当前数据库的隔离级别 show variables like '%tx_isolation%'; -- 设置隔离级别为RU级别(当前连接生效) set transaction isolation level read uncommitted; -- 设置隔离级别为RC级别(全局生效) set global transaction isolation level read committed; -- 设置隔离级别为RR级别(当前连接生效) -- 这里和上述的那条命令作用相同,是第二种设置的方式 set tx_isolation = 'repeatable-read'; -- 设置隔离级别为最高的serializable级别(全局生效) set global.tx_isolation = 'serializable'; 上述实际上一眼就能看懂,唯一要注意的在于:如果想要让设置的隔离级别在全局生效,一定要记得加上global关键字,否则生效范围是当前会话,也就是针对于当前数据库连接有效,在其他连接中依旧是原本的隔离级别。三、MySQL的事务实现原理 到这里为止,一些MySQL事务相关的概念和基础就已经讲明白了,现在重点来聊一聊MySQL事务究竟是怎么实现的呢?先把结论抛出来:MySQL的事务机制是基于日志实现的。为什么是基于日志实现的呢?一起来展开聊一聊。3.1、正常SQL的事务机制 在前面聊到过的一点:MySQL默认开启事务的自动提交,并且将一条SQL视为一个事务。那MySQL在何种情况下会将事务自动提交呢?什么情况下又会自动回滚呢?想要弄明白这个问题,首先得回顾一下之前讲过的《SQL执行篇-写入SQL的执行流程》,在讲写入类型SQL的执行流程时,曾讲过一点:任意一条写SQL的执行都会记录三个日志:undo-log、redo-log、bin-log。 undo-log:主要记录SQL的撤销日志,比如目前是insert语句,就记录一条delete日志。 redo-log:记录当前SQL归属事务的状态,以及记录修改内容和修改页的位置。 bin-log:记录每条SQL操作日志,只要是用于数据的主从复制与数据恢复/备份。 在写SQL执行记录的三个日志中,bin-log暂且不需要关心,这个跟事务机制没关系,重点是undo-log、redo-log这两个日志,其中最重要的是redo-log这个日志。redo-log是一种WAL(Write-ahead logging)预写式日志,在数据发生更改之前会先记录日志,也就是在SQL执行前会先记录一条prepare状态的日志,然后再执行数据的写操作。但要注意:MySQL是基于磁盘的,但磁盘的写入速度相较内存而言会较慢,因此MySQL-InnoDB引擎中不会直接将数据写入到磁盘文件中,而是会先写到BufferPool缓冲区中,当SQL被成功写入到缓冲区后,紧接着会将redo-log日志中相应的记录改为commit状态,然后再由MySQL刷盘机制去做具体的落盘操作。因为默认情况下,一条SQL会被当成一个事务,数据写入到缓冲区后,就代表执行成功,因此会自动修改日志记录为commit状态,后续则会由MySQL的后台线程执行刷盘动作。举个伪逻辑的例子,例如下述这条插入SQL的执行过程大致如下:-- 先记录一条状态为 prepare 的日志 -- 然后执行SQL,在缓冲区中更改对应的数据 INSERT INTO `zz_users` VALUES(5,"黑竹","男","9999","2022-09-24 23:48:29"); -- 写入缓冲区成功后,将日志记录改为 commit状态 -- 返回 [Affected rows: 1],MySQL后台线程执行刷盘动作 一条SQL语句组成的事务,其执行过程是不是很容易理解~,接着来看看手动开启事务的实现。3.2、多条SQL的事务机制先把前面的案例搬下来,如下:-- 开启事务 start transaction; -- 修改 ID=4 的姓名为:黑熊(原本user_name = 1111) update `zz_users` set `user_name` = "黑熊" where `user_id` = 4; -- 删除 ID=1 的行数据 delete from `zz_users` where `user_id` = 1; -- 提交事务 COMMIT; 比如这段SQL代码执行的过程又是啥样的呢?一起来瞧一瞧:①当MySQL执行时,碰到start transaction;的命令时,会将后续所有写操作全部先关闭自动提交机制,也就是后续的所有写操作,不管有没有成功都不会将日志记录修改为commit状态。②先在redo-log中为第一条SQL语句,记录一条prepare状态的日志,然后再生成对应的撤销日志并记录到undo-log中,然后执行SQL,将要写入的数据先更新到缓冲区。③再对第二条SQL语句做相同处理,如果有更多条SQL则逐条依次做相同处理..... ,这里简单的说一下撤销日志长啥样,大致如下:-- 第一条修改SQL的撤销日志(将修改的姓名字段从 黑熊 改回 1111) update `zz_users` set `user_name` = "1111" where `user_id` = 4; -- 第二条删除SQL的撤销日志(将删除的行数据再次插入) INSERT INTO `zz_users` VALUES(1,"熊猫","女","6666","2022-08-14 15:22:01"); ④直到碰到了rollback、commit命令时,再对前面的所有写SQL做相应处理:如果是commit提交事务的命令,则先将当前事务中,所有的SQL的redo-log日志改为commit状态,然后由MySQL后台线程做刷盘,将缓冲区中的数据落入磁盘存储。如果是rollback回滚事务的命令,则在undo-log日志中找到对应的撤销SQL执行,将缓冲区内更新过的数据全部还原,由于缓冲区的数据被还原了,因此后台线程在刷盘时,依旧不会改变磁盘文件中存储的数据。OK~,其实事务机制的底层实现也并不麻烦,稍微一推导、一思考就能想明白的道理。当然,大家有兴趣的再去推导一下:事务撤销点是怎么实现的呢?其实也并不难的,略加思考即可以得到答案。3.3、事务的恢复机制 现在再来思考一个问题,有没有这么一种可能呢?也就是当SQL执行时,数据还没被刷写到磁盘中,结果数据库宕机了,那数据是不是就丢了啊?毕竟本地磁盘中的数据,在MySQL重启后依旧存在,但缓冲区中还未被刷到磁盘的数据呢?因为缓冲区位于内存中,所以里面的数据重启是不会存在的撒?对于这个问题呢实际上并不需要担心,因为前面聊到过redo-log是一种预写式日志,会先记录日志再去更新缓冲区中的数据,所以就算缓冲区的数据未被刷写到磁盘,在MySQL重启时,依旧可以通过redo-log日志重新恢复未落盘的数据,从而确保数据的持久化特性。当然,有人或许又会问:那如果在记录redo-log日志时,MySQL芭比Q了咋整?如果遇到了这个问题呢,首先得恭喜你,你的运气属于很棒,能碰到这个问题的几率足够你买彩票中五百万了~玩笑归玩笑,现在回归话题本身,这个问题总不能让它存在是不?毕竟有这个问题对于系统而言也是个隐患啊,但仔细一思考,其实这个问题不必多虑,为啥?推导一下。首先看看前面的那种情况:数据被更新到缓冲区但没刷盘,然后MySQL宕机了,MySQL会通过日志恢复数据。这里要注意的是:数据被更新到缓冲区代表着SQL执行成功了,此时客户端会收到MySQL返回的写入成功提示,只是没有落盘而言,所以MySQL重启后只需要再次落盘即可。但如果在记录日志的时候MySQL宕机了,这代表着SQL都没执行成功,SQL没执行成功的话,MySQL也不会向客户端返回任何信息,因为MySQL一直没返回执行结果,因此会导致客户端连接超时,而一般客户端都会有超时补偿机制的,比如会超时后重试,如果MySQL做了热备/灾备,这个重试的时间足够MySQL重启完成了,因此用户的操作依旧不会丢失(对于超时补偿机制,在各大数据库连接池中是有实现的)。但如若又有小伙伴纠结:我MySQL也没做热备/灾备这类的方案呐,此时咋整呢?如果是这样的情况,那就只能自认倒霉了,毕竟MySQL挂了一直不重启,不仅仅当前的SQL会丢失,后续平台上所有的用户操作都会无响应,这属于系统崩溃级别的灾难了,因此只能靠完善系统架构来解决。四、MySQL事务篇总结 一点点看到这里,《MySQL事务篇》也就接近了尾声,在本篇中对事务机制一点点去引出,慢慢的到事务机制的概述、并发事务的问题、事务的隔离级别、事务的实现原理等诸多方面进行了全面剖析,但大家应该也略微有些不尽兴,毕竟对于隔离级别的具体实现并未讲到,这是由于MySQL事务与锁机制之间有着千丝万缕的关系,所以在《MySQL锁篇》中会再次详细讲到事务隔离机制的。当然,由于目前是分布式/微服务架构横行的时代,所以也引出了新的问题,即分布式事务问题,这个问题又需要通过全新的事务机制去处理了,对于这点再讲完《MySQL分库分表》后,会再单开一章《分布式事务篇》去详细阐述,这里头的学问很大~再次结合undo-log、redo-log日志来看待ACID的四大特性:原子性、一致性、隔离性、持久性。 原子性要求事务中所有操作要么全部成功,要么全部失败,这点是基于undo-log来实现的,因为在该日志中会生成相应的反SQL,执行失败时会利用该日志来回滚所有写入操作。 持久性要求的是所有SQL写入的数据都必须能落入磁盘存储,确保数据不会丢失,这点则是基于redo-log实现的,具体的实现过程在前面事务恢复机制讲过。 隔离性的要求是一个事务不会受到另一个事务的影响,对于这点则是通过锁机制和MVCC机制实现的,只不过MySQL屏蔽了加锁和MVCC的细节,具体的会在后续章节中细聊。 一致性要求数据库的整体数据变化,只能从一个一致性状态变为另一个一致性状态,其实前面的原子性、持久性、隔离性都是为了确保这点而存在的。 以上文章来自[掘金]-[竹子爱熊猫]本程序使用github开源项目RSSHub提取聚合!
2022年10月18日
0 阅读
0 评论
0 点赞
2022-10-18
读完 RocketMQ 源码,我学会了如何优雅的创建线程-掘金
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时、高可靠的消息发布与订阅服务。这篇文章,笔者整理了 RocketMQ 源码中创建线程的几点技巧,希望大家读完之后,能够有所收获。1 创建单线程首先我们先温习下常用的创建单线程的两种方式: 实现 Runnable 接口 继承 Thread 类 ▍一、实现 Runnable 接口图中,MyRunnable 类实现了 Runnable 接口的 run 方法,run 方法中定义具体的任务代码或处理逻辑,而 Runnable 对象是作为线程构造函数的参数。▍二、 继承 Thread 类线程实现类直接继承 Thread ,本质上也是实现 Runnable 接口的 run 方法。2 单线程抽象类创建单线程的两种方式都很简单,但每次创建线程代码显得有点冗余,于是 RocketMQ 里实现了一个抽象类 ServiceThread 。我们可以看到抽象类中包含了如下核心方法: 定义线程名; 启动线程; 关闭线程。 下图展示了 RocketMQ 众多的单线程实现类。实现类的编程模版类似 :我们仅仅需要继承抽象类,并实现 getServiceName 和 run 方法即可。启动的时候,调用 start 方法 , 关闭的时候调用 shutdown 方法。3 线程池原理线程池是一种基于池化思想管理线程的工具,线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。JDK 中提供的 ThreadPoolExecutor 类,是我们最常使用的线程池类。 参数名 作用 corePoolSize 队列没满时,线程最大并发数 maximumPoolSizes 队列满后线程能够达到的最大并发数 keepAliveTime 空闲线程过多久被回收的时间限制 unit keepAliveTime 的时间单位 workQueue 阻塞的队列类型 threadPoolFactory 改变线程的名称、线程组、优先级、守护进程状态 RejectedExecutionHandler 超出 maximumPoolSizes + workQueue 时,任务会交给 RejectedExecutionHandler 来处理 任务的调度通过执行 execute 方法完成,方法的核心流程如下: 如果 workerCount < corePoolSize,创建并启动一个线程来执行新提交的任务。 如果 workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中。 如果 workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务。 如果 workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则根据拒绝策略来处理该任务,默认的处理方式是直接抛异常。 4 线程池封装在 RocketMQ 里 ,网络请求都会携带命令编码,每种命令映射对应的处理器,而处理器又会注册对应的线程池。当服务端 Broker 接收到发送消息命令时,都会有单独的线程池 sendMessageExecutor 来处理这种命令请求。基于 ThreadPoolExecutor 做了一个简单的封装 ,BrokerFixedThreadPoolExecutor 构造函数包含六个核心参数: 核心线程数和最大线程数相同 ,数量是:cpu 核数和 4 比较后的最小值; 空闲线程的回收的时间限制,默认 1 分钟; 发送消息队列,有界队列,默认 10000; 线程工厂 ThreadFactoryImpl ,定义了线程名前缀:SendMessageThread_ 。 RocketMQ 实现了一个简单的线程工厂:ThreadFactoryImpl,线程工厂可以定义线程名称,以及是否是守护线程 。开源项目 Cobar ,Xmemcached,Metamorphosis 中都有类似线程工厂的实现 。5 线程名很重要线程名很重要,线程名很重要,线程名很重要,重要的事情说三遍。我们看到 RocketMQ 中,无论是单线程抽象类还是多线程的封装都会配置线程名 ,因为通过线程名,非常容易定位问题,从而大大提升解决问题的效率。定位的媒介常见有两种:日志文件和堆栈记录。▍一、日志文件经常处理业务问题的同学,一定都经常与日志打交道。 查看 ERROR 日志,追溯到执行线程, 要是线程池隔离做的好,基本可以判断出哪种业务场景出了问题; 通过查看线程打印的日志,推断线程调度是否正常,比如有的定时任务线程打印了开始,没有打印结束,推论当前线程可能已经挂掉或者阻塞。 ▍二、堆栈记录jstack 是 java 虚拟机自带的一种堆栈跟踪工具 ,主要用来查看 Java 线程的调用堆栈,线程快照包含当前 java 虚拟机内每一条线程正在执行的方法堆栈的集合,可以用来分析线程问题。jstack -l 进程pid 笔者查看线程堆栈,一般关注如下几点: 当前 jvm 进程中的线程数量和线程分类是否在预期的范围内; 系统接口超时或者定时任务停止的异常场景下 ,分析堆栈中是否有锁未释放,或者线程一直等待网络通讯响应; 分析 jvm 进程中哪个线程占用的 CPU 最高。 6 总结本文是 RocketMQ 系列文章的开篇,和朋友们简单聊聊 RocketMQ 源码里创建线程的技巧。 单线程抽象类 ServiceThread使用者只需要实现业务逻辑以及定义线程名即可 ,不需要写冗余的代码。 线程池封装适当封装,定义线程工厂,并合理配置线程池参数。 线程名很重要文件日志,堆栈记录配合线程名能大大提升解决问题的效率。 RocketMQ 的多线程编程技巧很多,比如线程通讯,并发控制,线程模型等等,后续的文章会一一为大家展现。写在后面过多内容就不一一展示了, 好了,退休程序猿会持续更新,咱们下期再见~~~以上文章来自[掘金]-[退休程序猿]本程序使用github开源项目RSSHub提取聚合!
2022年10月18日
0 阅读
0 评论
0 点赞
2022-10-18
详解闲鱼推荐系统(长文收藏)-掘金
作者:闲鱼技术——序玮导语在互联网信息爆炸的今天,推荐系统是我们身边一个无法躲避的存在。在淘宝上浏览商品,在抖音上刷视频,以及无处不在的广告等等。本文探讨闲鱼商品推荐系统的同时,结合所面临的多推荐场景工程维护任务重、算法模型优化难以自动辐射多场景的痛点,介绍如何构建通用的推荐中台。背景推荐系统用户在网络上浏览时,如果能准确描述自己的需求,可以通过主动搜索来找到自己需要的信息。但是在不少情况下,用户并不一定能准确的描述自己的需求,或者用户就是来逛的,这个时候用户往往希望产品能主动把感兴趣的信息直接推送到自己面前。此时,推荐系统即发挥其中介作用,来连接用户与信息。而在互联网信息蓬勃发展的今天,每时每刻都在生产大量的信息与内容。因此,推荐系统需要解决的问题,便是从海量的候选信息中,挑选出用户最感兴趣的信息。为了完成这一挑选最优过程,典型的闲鱼商品推荐系统,涉及模块如下图所示。 数据 推荐系统本质是架构于数据之上的,这里的数据包括用户自身的年龄、性别、所属地域,用户所发布的商品、内容结构化数据,以及用户浏览过程中产生的曝光、点击、互动等行为数据。通过对海量的用户&商品&行为数据进行分析与计算处理,从而生成用户的trigger信息,生成样本数据辅助算法模型的训练,构建生成引擎索引数据表等。 用户信息 用户请求到来时,需要先识别出用户是谁,进而实现后续的个性化推荐。这里我们通常会根据用户的唯一id,查询用户中心服务系统,得到用户维度的基本信息。 trigger trigger,作为触发阶段,是推荐的源头,一般为用户历史浏览、点击、互动行为的商品,以及用户偏好等。在数据处理阶段,我们对这些trigger信息进行提炼。通过数据回流链路,最终将trigger信息存储于在线系统,在线请求服务时,根据用户唯一id实时查询出数据,参与后续的流程。 召回 上文有提到,全量的推荐候选集,数据量级往往突破千万,甚至级别。如果每一次的用户请求,我们均采用全局排序的思路,遍历整个推荐候选集,进行模式算分后,再找出topK个最佳预测的商品。这样的推荐效果可能很不错,但是对计算资源与性能要求过于苛刻,在生产环境中难以实现。因此,我们通过召回,来解决在候选集上进行全量排序的资源&性能问题。 在召回阶段,基于用户的特征与trigger信息,从全量的亿级别候选集中,挑选出万级别的商品,送入后续的算分排序阶段。而在召回手段上,主要有规则定义、协同过滤、向量召回等。 粗/精排 算分一般也分为粗排+精排两阶段。粗排针对召回的万级别商品进行模型算分,由于打分候选量级较大,这里模型网络结构上一般采用双塔结构,而不做复杂的交叉网络。有了粗排分,我们再挑选排在前面的千级别候选集,进行一轮精排模型算分。此时待打分的商品量级可控,因此这一环节通常采用更多的用户&商品&交叉特征与较为复杂的网络模型。 重排 前面的排序主要通过算法网络模型决定,而重排阶段,则会体现场景的业务诉求,做一下打散干预,比如类目打散、价格打散等。 结果返回 经过上面的链路后,推荐系统最终将topK个(一般为10~20个)最佳预测的商品,填充元信息后,返回给用户进行展示。 日志&埋点 对展现给用户的商品做日志记录与埋点,便于回收用户行为数据。基于上面的分析,一条完整的推荐链路开发维护成本并不低,其中涉及离线数据分析处理、采集样本、算法模型训练、模型&内容表索引构建、在线查询&召回&算分引擎部署、在线推荐服务开发、日志&埋点回收等。面临的问题闲鱼目前推荐场景数10+,在过去四个月新增了4个新的场景(闲鱼币,新品推荐,购后推荐,新发tab),同时更多推荐场景正在规划中(省心卖,首页tab feeds流等),这么多的tpp场景背后是闲鱼对个性化推荐的大量需求。 从工程角度来看每个新业务接入都需要从0到1搭建完整的推荐链路,除了大量重复的工作之外还伴随着不小的维护成本,如何才能降低如此众多场景的边际成本提升边际效益。 从算法视角来看,这些推荐场的算法模型都需要case by case迭代优化,如何实现模型迭代优化自动辐射到更多的场景。设计方案设计目标基于上文分析,如何在有限的成本之下快速从算法获取更多的红利是我们核心要解决的问题。因此我们期望构建这样一个推荐中台,通过一套推荐底坐支撑所有的中小推荐场,实现收敛推荐链路的同时让算法模型迭代形成规模效应。 显然,针对不同的业务场景,其对推荐策略存在不同的诉求。即使同一个场景内,也存在不同策略实验的述求。因此我们将推荐链路中的各个环节,进行了抽象,沉淀出内容池策略、特征策略、召回策略、粗排&精排&重排策略等,每一个推荐场景便可以认为是各环节策略的组合。同时,我们将策略以配置化+插件的方式对外输出。这样,当我们有新的场景接入时,不再需要去搭建完整的推荐链路,而是通过少量的配置化工作完成。最终将新场景上线的周期,由周级别降低至天级别,同时算法模型的迭代优化也能更加专注,接入流程如下图所示。整体架构如下图,是推荐中台的整体架构。整体上,我们依赖特征中心,根据用户与商品维度的特征信息灵活组合,计算产出各个场景的推荐候选池,并构建底池索引至引擎中。算法结合数据样本与底池,进行模型训练与召回数据训练,并产出模型与内容表,也回流至引擎供在线部分使用。在引擎部分,我们针对召回与排序,提供了通用召回与算分模型的同时,也定义了标准的输入输出协议,以满足业务场景定制化接入的诉求。场景的所有策略抽象,都收敛至实验平台进行管理。用户在线请求pv到来后,我们首先根据场景id路由,拉取到相关配置后,根据具体的策略内容,逐步执行推荐各环节,如下示意。推荐候选池推荐候选池作为推荐业务场景最大的区别之一,其直接决定了推荐中台的复用/通用能力。因此支持高效灵活的定义推荐候选池,是首先需要解决的问题, 而从实际来看,结合闲鱼众多推荐场景,推荐商品候选池可以看做是用户与商品维度特征的组合,如下示例。因此,我们使用特征来定义推荐候选池,通过特征的自由组合与实时计算,实现推荐候选池的大规模定义与计算,生产链路如下图所示。 这里面,闲鱼特征中心收敛了用户维度与商品维度特征(包括离线统计计算与实时产生的特征),以如下格式对外输出。有了基础特征信息,再搭配推荐候选池定义的特征表达式组合,使用数据ETL工具,将原始商品与用户特征、商品信息维表、推荐池定义维表等数据源,经过多层merge、join的操作,最终在商品维度打上场景唯一标识(这个标识是一个以逗号分隔的复合字段,便于单个商品同时满足多个候选池规则时打标),生成各场景的推荐候选池。召回引擎召回策略上,我们提供了三种索引方式,分别是i2i索引,x2i索引,深度召回,并标准化掉召回的输入与输出。对于输入,有两种通用的格式,第一种为trigger格式,引擎将以传入的trigger作为key,从i2i与x2i索引中执行kv检索与倒排检索。第二种则是针对深度召回,上游传入模型预测出的embedding向量,再经由向量引擎完成检索。输出则是召回检索得到的商品item_id与对应的召回recall_score。 目前三种方式共计10+路召回通道可供业务场景选择,每一路召回通道都枚举了一个标识。接入业务只需要配置选择用哪些召回通道即可。 i2i:根据商品积累的用户点击行为,计算item-item的用户共现点击得分,作为i2i的相似度。 x2i:这里的x可以是商品的tag、class、brand、query、pool_ids等,根据用户全域的行为构建用户偏好,对商品标题信息进行分词,以及用户的tag,class,品牌,搜索场景下对应query等,最终构建倒排索引进行检索。 深度召回:主要通过深度网络模型,来预测用户与商品的相似性。模型分别计算出用户侧向量与商品侧向量,在线检索时,根据用户侧向量,通过向量引擎完成ANN检索出topK个商品。 算分引擎算分引擎的作用,是将输入的待打分候选商品集,关联上商品特征,并结合用户的特征,通过深度网络模型的计算执行,完成候选商品集中每一个商品对该用户的个性化预测得分。这里我们提供了一个包含ctr、cvr与互动的多目标算分模型,满足了大多数场景的个性化需求。 此外,我们将算分排序模型的输入输出进行标准化,也提供了模型定制化的能力。有些场景不太适应通用的多目标模型,可遵循协议将模型接入,每一个模型具备一个唯一的标识biz_name,场景配置上选择该biz_name即可。模型存在多个目标得分,比如ctr_score、cvr_score、car_score等。而最终的得分如何计算,场景内也支持配置运算表达式与加权&降权(有些场景倾向转化,有些场景则重成交,或者满足交易抵扣的商品需要提权),来满足不同的场景要求。实验体系推荐系统迭代极快,算法工程师通常会展开很多AB实验,需要能够灵活的支持实验策略与流量调整。此外,全量用户基本比较固定,用户在不同场景,以及场景内不同实验,均需要做到互不干扰,保证实验的独立性。 在实现上,每次注册场景时,我们会同步创建实验与流量模型,并跟场景id进行绑定,确保场景之间的流量模型独立。场景内部多实验的诉求,则通过在流量模型内进一步动态分层的方式。这样场景A对应流量模型A,场景B对应流量模型B。而场景A里面,实验1按照50% vs 50%运行在流量模型A的分层1,实验2也可以按照50% vs 50%运行在流量模型A的分层2。 稳定性推荐中台,承载了闲鱼10+推荐场流量,因此对系统的稳定性和业务的高可用有极高的要求。在系统部署上,分别在中心机房(张北)和单元机房(南通)进行了异地多机房部署,确保线上单一机房故障异常时,能够通过紧急切流将流量转发至正常机房提供服务。 此外,我们对接入的业务场景,也做了逻辑隔离。分场景配置限流熔断,当某一个场景有异常或者突发流量时,能够对其快速降级&熔断,避免其余场景受到影响,保障整体的高可用。结语推荐是一项系统性工程,近年来在计算架构、模型网络结构等方面也不断演进。本文在介绍闲鱼商品推荐架构的基础上,围绕如何在有限的成本之下快速从算法获取更多的红利这一核心问题,提出通用推荐中台解决方案。 推荐中台的搭建,是工程和算法将自身能力进行沉淀的一次有效尝试,新场景只需要天级别就能完成接入,工程与算法的维护迭代也将更为专注与聚焦。目前已经接入10+场景,对比场景接入前后的效率指标,其中点击转化率提升8%以上,人均ipv提升10%以上。同时随着接入场景增多,也为平台沉淀有价值的数据标签。 此外,当前整体链路上也仍存在一些不足。在排序模型上,模型的精度还有一些欠缺,以及多场景联合建模尚未开展。在工程上,场景接入还需要开发同学介入干预,自动化程度有待提升。后续我们将持续迭代优质工程与算法能力。以上文章来自[掘金]-[闲鱼技术]本程序使用github开源项目RSSHub提取聚合!
2022年10月18日
0 阅读
0 评论
0 点赞
2022-10-18
502问题怎么排查?-掘金
本文为掘金社区首发签约文章,14天内禁止转载,14天后未获授权禁止转载,侵权必究!刚工作那会,有一次,上游调用我服务的老哥说,你的服务报"502错误了,快去看看是为什么吧"。当时那个服务里正好有个调用日志,平时会记录各种200,4xx状态码的信息。于是我跑到服务日志里去搜索了一下502这个数字,毫无发现。于是跟老哥说,"服务日志里并没有502的记录,你是不是搞错啦?"现在想来,多少有些不好意思。不知道有多少老哥是跟当时的我是一样的,这篇文章,就来聊聊502错误是什么?我们从状态码是什么开始聊起。HTTP状态码我们平时在浏览器里逛的某宝和某度,其实都是一个个前端网页。一般来说,前端并不存储太多数据,大部分时候都需要从后端服务器那获取数据。于是前后端之间需要通过TCP协议去建立连接,然后在TCP的基础上传输数据。而TCP是基于数据流的协议,传输数据时,并不会为每个消息加入数据边界,直接使用裸的TCP进行数据传输会有"粘包"问题。因此需要用特地的协议格式去对数据进行解析。于是在此基础上设计了HTTP协议。详细的内容可以看我之前写的《既然有HTTP协议,为什么还要有RPC》。比如,我想要看某个商品的具体信息,其实就是前端发的HTTP请求中传入商品的id,后端返回的HTTP响应中返回商品的价格,商店名,发货地址的信息等。这样,表面上,我们是在刷着各种网页,实际上背后正有多次HTTP消息在不断进行收发。但问题就来了,上面提到的都是正常情况,如果有异常情况呢,比如前端发的数据,根本就不是个商品id,而是一张图片,这对于后端服务端来说是不可能给出正常响应的,于是就需要设计一套HTTP状态码,用来标识这次HTTP请求响应流程是否正常。通过这个可以影响浏览器的行为。比方说一切正常,那服务端返回个200状态码,前端收到后,可以放心使用响应的数据。但如果服务端发现客户端发的东西异常,就响应个4xx状态码,意思是这是个客户端的错误,4xx里头的xx可以根据错误的类型,再细分成各种码,比如401是客户端没权限,404是客户端请求了一个根本不存在的网页。反过来,如果是服务器有问题,就返回5xx状态码。但问题就来了。服务端都有问题了,搞严重点,服务器可能直接就崩溃了,那它还怎么给你返回状态码?是的,这种情况,服务端是不可能给客户端返回状态码的。所以说,一般情况下5xx的状态码其实并不是服务器返回给客户端的。它们是由网关返回的,常见的网关,比如nginx。nginx的作用回到前后端交互数据的话题上,如果前端用户少,那后端处理起请求来,游刃有余。但随着用户越来越多,后端服务器受资源限制,cpu或者内存都可能会严重不足,这时候解决方案也很简单,多搞几台一样的服务器,这样就能将这些前端请求均摊给几个服务器,从而提升处理能力。但要实现这样的效果,前端就得知道后端具体有哪些个服务器,并一一跟他们建立TCP连接。也不是不行,但就是麻烦。但这时候如果能有个中间层挡在它们中间就好了,这样客户端只需要跟中间层连接,中间层再和服务器建立连接。于是,这个中间层就成了这帮服务器的一个代理人一样,客户端有啥事都找代理人,只管发出自己的请求,再由代理人去找某个服务器去完成响应。整个过程下来,客户端只知道自己的请求被代理人帮忙搞定了,但代理人具体找了那个服务器去完成,客户端并不知道,也不需要知道。像这种,屏蔽掉具体有哪些服务器的代理方式就是所谓的反向代理。反过来,屏蔽掉具体有哪些客户端的代理方式,就是所谓的正向代理。而这个中间层的角色,一般由nginx这类网关来充当。另外,由于背后的服务器可能性能配置各不相同,有些4核8G,有些2核4G,nginx能为它们加上不同的访问权重,权重高的多转发点请求,通过这个方式实现不同的负载均衡策略。nginx返回5xx状态码有了nginx这一中间层后,客户端从直连服务端,变成客户端直连nginx,再由nginx直连服务端。从一个TCP连接变成两个TCP连接。于是,当服务器发生异常时,nginx发送给服务器的那条TCP连接就不能正常响应,nginx在得到这一信息后,就会返回5xx错误码给客户端,也就是说5xx的报错,其实是由nginx识别出来,并返回给客户端的,服务端本身,并不会有5xx的日志信息。所以才会出现文章开头的一幕,上游收到了我服务的502报错,但我在自己的服务日志里却搜索不到这一信息。产生502的常见原因在rfc7231中有关于502错误码的官方解释是502 Bad Gateway The 502 (Bad Gateway) status code indicates that the server, while acting as a gateway or proxy, received an invalid response from an inbound server it accessed while attempting to fulfill the request. 翻译一下就是,502 (Bad Gateway) 状态代码表示服务器在充当网关或代理时,在尝试满足请求时从它访问的入站服务器接收到无效响应。汝听,人言否?这对于大部分编程小白来说,不仅没解释到问题,反而只会冒出更多的问号。比如,这上面提到的无效响应到底指的是什么?我来解释下,它其实是说,502其实是由网关代理(nginx)发出的,是因为网关代理把客户端的请求转发给了服务端,但服务端却发出了无效响应,而这里的无效响应,一般是指TCP的RST报文或四次挥手的FIN报文。四次挥手估计大家背的很熟了,所以略过,我们来重点说下RST报文是什么。RST是什么?我们都知道TCP正常情况下断开连接是用四次挥手,那是正常时候的优雅做法。但异常情况下,收发双方都不一定正常,连挥手这件事本身都可能做不到,所以就需要一个机制去强行关闭连接。RST 就是用于这种情况,一般用来异常地关闭一个连接。它是TCP包头中的一个标志位,在收到置这个标志位的数据包后,连接就会被关闭,此时接收到 RST的一方,在应用层会看到一个 connection reset 或 connection refused 的报错。而之所以发出RST报文,一般有两个常见原因。服务端过早断开连接nginx与服务端之间有一条TCP连接,在nginx将客户端请求转发给服务端时,他两之间按道理会一直保持这条连接,直到服务端将结果正常返回后,再断开连接。但如果服务端过早断开连接,而nginx却还继续发消息过去,nginx就会收到服务端内核返回的RST报文或四次挥手的FIN报文,迫使nginx那边的连接结束。过早断开连接的原因常见的有两个。第一个是,服务端设置的超时时间过短。不管是用的哪种编程语言,一般都有现成的HTTP库,服务端一般都会有几个timeout参数,比如golang的HTTP服务框架里有个写超时(WriteTimeout),假设设置了2s,那它的含义就是,服务端在收到请求后需要在2s内处理完并将结果写到响应中,如果等不到,就会将连接给断掉。比如你的接口处理时间是5s,而你的WriteTimeout却只有2s,在没等到响应写完之前,HTTP框架就会主动将连接给断开。nginx此时就有可能收到四次挥手的FIN报文(有些框架也可能发RST报文),然后断开连接,于是客户端就会收到一个502报错。遇到这种问题,将WriteTimeout的时间调大一些就好了。第二个原因,也是造成502状态码最常见的原因,就是服务端应用进程崩了(crash)。服务端崩了,也就是当前没有一个进程在监听服务器端口,而此时你却尝试向一个不存在的端口发数据,服务器的linux内核协议栈就会响应一个RST数据包。同样,这时候nginx也会给客户端一个502。在开发过程中,这种情况是最常见的。现在我们大部分的服务器都会将挂掉的服务重启,因此我们需要判断下服务是否曾经崩溃过。如果你有对服务端的cpu或者内存做过监控,可以看下CPU或内存的监控图是否出现过断崖式的突然下跌。如果有,十有八九百,就是你的服务端应用程序曾经崩溃过。除此之外你还通过下面的命令,看下进程上次的启动时间是什么时候。ps -o lstart {pid} 比如我要看的进程id是13515,命令就需要像下面这样。# ps -o lstart 13515 STARTED Wed Aug 31 14:28:53 2022 可以看到它上次的启动时间是8月31日,这个时间如果跟你印象中的操作时间有差距,那说明进程可能是崩了之后被重新拉起了。遇到这种问题,最重要的是找出崩溃的原因,崩溃的原因就多种多样了,比如,对未初始化的内存地址进行写操作,或者内存访问越界(数组arr长度明明只有2,代码却读arr[3])。这种情况几乎都是程序有代码逻辑问题,崩溃一般也会留下代码堆栈,可以根据堆栈报错去排查问题,修复之后就好了。比如下面这张图是golang的报错堆栈信息,其他语言的也类似。不打印堆栈的情况但有一些情况,有时候根本不留下堆栈。比如内存泄露导致进程占用内存越来越多,最后导致超过服务器的最大内存限制,触发OOM(out of memory), 进程直接就被操作系统kill掉。还有更隐蔽的,代码逻辑里隐藏了主动退出进程的操作。比如golang的日志打印里有个方法叫log.Fatalln(),打印完日志还会顺便执行os.Exit()直接退出进程,对源码不了解的新手很容易犯这个错。如果你很明确,你的服务没有崩过。那继续往下看。网关将请求打到了一个不存在的IP上nginx是通过配置的形式来代理多个服务器。这个配置一般是放在 /etc/nginx/nginx.conf 中。打开它,你可能会看到类似下面这样的信息。upstream xiaobaidebug.top { server 10.14.12.19:9235 weight=2; server 10.14.16.13:8145 weight=5; server 10.14.12.133:9702 weight=8; server 10.14.11.15:7035 weight=10; } 上面配置的含义是,如果客户端访问xiaobaidebug.top域名,nginx就会将客户端的请求转发到下面的4个服务器ip上,ip边上还有个weight权重,权重越高,被转发到的次数就越多。可以看出,nginx具有相当丰富的配置能力。但要注意的是,这些个文件是需要自己手动配置的。对于服务器少,且不怎么变化的情况,这当然没问题。但现在已经是云原生时代了,很多公司内部都有自己的云产品,服务自然也会上云。一般来说每次更新服务,都可能会将服务部署到一台新的机器上。而这个ip也会随着改变,难道每发布一次服务,都需要手动去nginx上改配置吗?这显然不现实。如果能在服务启动时,让服务主动将自己的ip告诉nginx,然后nginx自己生成这样的一个配置并重新加载,那事情就简单多了。为了实现这样一个服务注册的功能,不少公司都会基于nginx进行二次开发。但如果这个服务注册功能有问题,比方说服务启动后,新服务没注册上,但老服务已经被销毁了。这时候nginx还将请求打到老服务的IP上,由于老服务所在的机器已经没有这个服务了,所以服务器内核就会响应RST,nginx收到RST后回复502给客户端。要排查这种问题也不难。这个时候,你可以看下nginx侧是否有打印相关的日志,看下转发的IP端口是否符合预期。如果不符合预期,可以去找找做这个基础组件的同事,进行一波友好的交流。总结 HTTP状态码用来表示响应结果的状态,其中200是正常响应,4xx是客户端错误,5xx是服务端错误。 客户端和服务端之间加入nginx,可以起到反向代理和负载均衡的作用,客户端只管向nginx请求数据,并不关心这个请求具体由哪个服务器来处理。 后端服务端应用如果发生崩溃,nginx在访问服务端时会收到服务端返回的RST报文,然后给客户端返回502报错。502并不是服务端应用发出的,而是nginx发出的。因此发生502时,后端服务端很可能没有没有相关的502日志,需要在nginx侧才能看到这条502日志。 如果发现502,优先通过监控排查服务端应用是否发生过崩溃重启,如果是的话,再看下是否留下过崩溃堆栈日志,如果没有日志,看下是否可能是oom或者是其他原因导致进程主动退出。如果进程也没崩溃过,去排查下nginx的日志,看下是否将请求打到了某个不知名IP端口上。 最后最近原创更文的阅读量稳步下跌,思前想后,夜里辗转反侧。我有个不成熟的请求。离开广东好长时间了,好久没人叫我靓仔了。大家可以在评论区里,叫我一靓仔吗?我这么善良质朴的愿望,能被满足吗?如果实在叫不出口的话,能帮我点下关注和右下角的点赞+收藏吗?别说了,一起在知识的海洋里呛水吧以上文章来自[掘金]-[小白debug]本程序使用github开源项目RSSHub提取聚合!
2022年10月18日
0 阅读
0 评论
0 点赞
2022-10-18
✨基于Spring-Data-Elasticsearch 优雅的实现 多字段搜索 + 高亮 + 分页 + 数据同步✨-掘金
持续创作,加速成长!这是我参与「掘金日新计划 · 10 月更文挑战」的第17天,点击查看活动详情系列说明本系列文章基于我的开源微服务项目【校园博客】进行分析和讲解,所有源码均可在GitHub仓库上找到。 系列文章地址请见我的 校园博客专栏。 GitHub地址:https://github.com/stick-i/scblogs 目前项目还有很大改进和完善的空间,欢迎各位有意愿的同学参与项目贡献(尤其前端),一起学习一起进步😋。项目的技术栈主要是:后端 Java + SpringBoot + SpringCloud + Nacos + Getaway + Fegin + MybatisPlus + MySQL + Redis + ES + RabbitMQ + Minio + 七牛云OSS + Jenkins + Docker前端 Vue + ElementUI + Axios(说实话前端我不太清楚😅)前言本篇文章主要是一些对Spring-Data-Elasticsearch使用上的记录和讲解,对原理和基础知识并没有介绍,适合有一定ES基础的朋友阅读。为了给项目添加一个好的搜索功能,我去学习了一下elasticsearch。在学习elasticsearch-client的期间,发现它提供的api不太优雅,用起来也不太舒服,而且我觉得有些操作完全是可以封装在内部的,比如获取数据后,对数据转化为bean的操作;还有属性高亮,不仅设置比较麻烦,而且设置完成的高亮居然是单独在一个字段里的,需要开发者去手动的替换才行,这些操作我觉得其实都可以封装在内部的,害,个人感慨,请勿介意。然后我就去看了一下spring-data里面提供的 es 操作库,发现有很多操作都封装的比较完善,使用起来也比较优雅,于是我便使用spring-data-elasticsearch完成了这个功能,查阅了很多资料、博客、官方文档,有些地方我觉得官方文档讲的也不够详细,导致走了很多弯路,也可能是我没有找到详细的文档。为了方便大家学习和少走弯路,也便于本人日后回顾,故记录于此。本篇文章讲的内容是在项目的 /blog-service/blog-content-server 路径下,感兴趣的同学欢迎随时查看,觉得不错的话也欢迎点点star噢。技术要点 使用 copyTo 和 ElasticsearchRepository 完成的多字段搜索。 使用注解 @Highlight 和 @HighlightField 完成的高亮显示。 使用 Pageable 和 SearchPage 实现分页和高亮两不误的接口。 使用 RabbitMQ 完成 MySQL 和 elasticsearch 的数据同步。 依赖项我当前的环境: springboot 2.6.6 elasticsearch 7.12 kibana 7.12(这个不是必须的) 然后当前版本的spring默认是用的 7.15.2 的我担心和我的es不兼容,就加了个标签给它改了一下版本:7.12.1 核心依赖其实就这一个,这里面已经依赖了elasticsearch需要的一些依赖,例如 elasticsearch-rest-high=level-client。 org.springframework.boot spring-boot-starter-data-elasticsearch 然后如果跟我一样使用 RabbitMQ 做数据同步的话,还需要引用mq的依赖: org.springframework.boot spring-boot-starter-amqp com.fasterxml.jackson.core jackson-databind 配置文件这里需要配置elasticsearch的账号密码spring: elasticsearch: uris: "http://localhost:9200" username: 12345 password: 12345 核心代码实体类BlogDoc下面是我代码当中跟 es 进行交互的实体类,代码上有相关的注释,我将一些多余的、意义不大的属性删掉了,方便大家查看。package cn.sticki.blog.content.pojo; import com.fasterxml.jackson.annotation.JsonIgnore; import lombok.Data; import org.springframework.data.annotation.Id; import org.springframework.data.elasticsearch.annotations.Document; import org.springframework.data.elasticsearch.annotations.Field; import org.springframework.data.elasticsearch.annotations.FieldType; import java.util.Date; /** * Blog ES文档类型 * * @author 阿杆 * @version 1.0 * @date 2022/7/8 15:24 */ @Data @Document(indexName = "blog") public class BlogDoc { /** * 博客id */ @Id Integer id; /** * 封面图链接 */ @Field(type = FieldType.Keyword, index = false) String coverImage; /** * 标题 */ @Field(type = FieldType.Text, analyzer = "ik_max_word", copyTo = "descriptiveContent") String title; /** * 描述 */ @Field(type = FieldType.Text, analyzer = "ik_max_word", copyTo = "descriptiveContent") String description; /** * 创建时间 */ @Field(type = FieldType.Date, pattern = "uuuu-MM-dd HH:mm:ss") Date createTime; /** * 发表状态(1表示已发表、2表示未发表、3为仅自己可见、4为回收站、5为审核中) */ @Field(type = FieldType.Integer) Integer status; /** * 由其他属性copy而来,主要用于搜索功能,不需要储存数据 */ @JsonIgnore @Field(type = FieldType.Text, analyzer = "ik_max_word", ignoreFields = "descriptiveContent", excludeFromSource = true) String descriptiveContent; } 注解说明: @Document(indexName = "blog"):声明该实体类对应es中的哪个索引库。 @Id:声明该字段对应索引库当中的id。 @JsonIgnore:这个应该很熟悉吧,就是在json序列化时将对象中的一些属性忽略掉,使返回的json数据不包含该属性。 @Field(...) ,这些其实都对应es的api调用时传入的字段,有一点es基础会很容易看懂,也可以看看我写的elasticsearch专栏下的其他文章,前几篇是我学基础的时候记录的。 type = FieldType.Integer :声明字段属性,如果不写,默认为auto,就是es会帮你自动匹配成最合适的字段类型,建议还是写一下。 index = false :声明该字段不需要建立索引,一般用于不会被拿来搜索、排序、统计的字段,比如我这里写的封面图链接。 analyzer = "ik_max_word" : 声明该text字段需要使用的分词器,我这里是用的ik分词器,需要开发者去手动安装,但对中文分词比较友好。 excludeFromSource = true:翻译出来意思是“从源中排除”,应该是指这个字段的属性不会插入到es索引库当中吧,这个字段是我用来``copy_to`的,主要是搜索的时候使用,本身并不会直接存入数据,所以这个字段如果有数据,我希望插入的时候把它忽略。 copyTo = "descriptiveContent":这个就是跟es的copy_to一样,就是说把当前属性拷贝到“descriptiveContent”当中,可以拷贝多个属性到同一个字段中,便于搜索、查询。 pattern = "uuuu-MM-dd HH:mm:ss" : 声明该自定义的格式字符串,一般在type = FieldType.Date时使用。 format:跟pattern差不多,官方解释是用于定义至少一种预定义格式。如果未定义,则使用默认值*_date_optional_time和epoch_millis*。也就是只能使用给定的枚举值,不能自定义,自定义的话得用pattern。下图是谷歌翻译的官方解释: 实体类属性copy_to大家都知道,在es当中如果有多个字段需要被同时查询(比如我的博客业务,要搜索内容的时候,我会把用户输入的关键字同时拿来匹配标题和文章描述),那可以用multi_match、query_string进行多字段查询,也可以用copy_to将多个字段复制到一个新属性上再去查新属性,这几种方法都是可以的,但是copy_to它的性能会高一些,尤其是在同时要查的属性非常多的时候,这属于是一种储存换取速度的方式。copy_to的属性在上面已经讲过了,跟es的api用来起来差不多的,但是我上面的代码还写了一个descriptiveContent:/** * 由其他属性copy而来,主要用于搜索功能,不需要储存数据 */ @JsonIgnore @Field(type = FieldType.Text, analyzer = "ik_max_word", ignoreFields = "descriptiveContent", excludeFromSource = true) String descriptiveContent; 这个属性就是被cope_to到的那个属性,但实际上我们在写代码的时候并不会给它赋值或者取值或者别的怎么样,总是就是希望他尽可能透明,仅在对es时有效,因为es里是已经提前定义好这个索引库了的,es创建索引库的代码我会贴在文章最后。这是因为,后面我们要使用ElasticsearchRepository的时候,被查询的字段如果不存在于这个实体类,idea会有一个很碍眼的提示,作为强迫症患者,这就引发了我的思考,是不是我们在定义实体类的时候,要和定义索引库的时候一样给出全部的字段呢?尽管这个字段只是一个“隐身”的字段。为了把这个碍眼的提示去掉 为了让代码变得更可读一点,所以我加上了这个字段,并加了一些忽略的属性使它尽可能隐身。Mapper层(Repository)核心代码如下,具体解释和分析在下面:package cn.sticki.blog.content.mapper; import cn.sticki.blog.content.pojo.BlogDoc; import org.springframework.data.domain.Pageable; import org.springframework.data.elasticsearch.annotations.Highlight; import org.springframework.data.elasticsearch.annotations.HighlightField; import org.springframework.data.elasticsearch.annotations.HighlightParameters; import org.springframework.data.elasticsearch.core.SearchPage; import org.springframework.data.elasticsearch.repository.ElasticsearchRepository; /** * BlogRepository操作类 * 提供save、findById、findAll、count、delete、exists等接口 * * @author 阿杆 * @version 1.0 * @date 2022/7/9 10:53 */ public interface BlogRepository extends ElasticsearchRepository { /** * 通过描述内容来搜索博客 * * @param descriptiveContent 描述语句 * @param pageable 分页 * @return 博客列表 */ @SuppressWarnings("SpringDataRepositoryMethodReturnTypeInspection") @Highlight(fields = { @HighlightField(name = "title", parameters = @HighlightParameters(requireFieldMatch = false)), @HighlightField(name = "description", parameters = @HighlightParameters(requireFieldMatch = false)), }) SearchPage findByDescriptiveContent(String descriptiveContent, Pageable pageable); } 继承ElasticsearchRepository 这个其实就有点像继承BaseMapper,它会给你提供一些基础的CRUD方法,方便你直接使用,比如save、delete、find之类的。 它是个泛型类,两个参数分别是 。 在该接口下(BlogRepository)按照特殊的命名规则声明的方法,可以直接调用,不需要开发者实现接口,且它返回的内容是已经封装好的,你需要的数据会被封装在你提供的实体类里面(不用手动解析数据)。大概就是 findByXxxAndXxxOrXxx() 这个类型,具体的可以参考官网:https://docs.spring.io/spring-data/elasticsearch/docs/4.3.5/reference/html/#elasticsearch.query-methods.criterions,这里也截一点给大家看看(谷歌浏览器翻译的): 也可以使用 @Query 注解写原生的 api 请求接口,不太优雅,个人不推荐使用。 然后这里我只添加了一个方法:SearchPage findByDescriptiveContent(String descriptiveContent, Pageable pageable); 这个意思就是所通过 DescriptiveContent 属性来查询数据,后面的两个参数一个是搜索的内容,一个是分页的参数(分页需要配合支持分页的返回值才行)。这个findByXxx的Xxx属性必须是实体类里面存在的属性才可以,不然会提示错误:高亮显示@SuppressWarnings("SpringDataRepositoryMethodReturnTypeInspection") @Highlight(fields = { @HighlightField(name = "title", parameters = @HighlightParameters(requireFieldMatch = false)), @HighlightField(name = "description", parameters = @HighlightParameters(requireFieldMatch = false)), }) 使用注解 @Highlight 和 @HighlightField,来设置高亮的字段,使用 @HighlightParameters 来添加高亮的参数。我这里设置了requireFieldMatch = false,这个参数是取消只有字段匹配才给高亮的规则,这是因为我搜索的字段是由另外两个字符copyTo而来的,高亮的内容肯定是在另外两个字段里面,设置该参数可以让其他字段的高亮也展示出来。这里还有一篇高亮显示的教程文章,我讲的比较粗糙,他这个写的比较详细,贴给大家学习:https://blog.csdn.net/qq_45794678/article/details/111188548官方文档给的说明就这么点。。。怕我学会了然后教别人吗。。。分页功能通过 Pageable 做参数和 SearchPage 做返回值来完成了对分页的需求,传参的时候使用 PageRequest.of(page, size) 来创建分页参数即可。得到结果后仅需将分页的内容替换掉实体类的内容即可,并且数据里面包含有获取页码的信息的接口:Service层核心代码如下:@Service public class BlogContentServiceImpl implements BlogContentService { @Resource private BlogRepository blogRepository; /** * 搜索博客 * * @param key 搜索内容 * @param page 页码 * @param size 页大小 * @return 搜索到的结果列表 */ @Override public List searchBlog(String key, int page, int size) { // 1. 获取数据 SearchPage searchPage = blogRepository.findByDescriptiveContent( // 1.1 设置key和分页,这里是从第0页开始的,所以要-1 key,PageRequest.of(page - 1, size)); // 2. 高亮数据替换 List searchHitList = searchPage.getContent(); ArrayList blogDocList = new ArrayList(searchHitList.size()); for (SearchHit blogHit : searchHitList) { // 2.1 获取博客数据 BlogDoc blogDoc = blogHit.getContent(); // 2.2 获取高亮数据 Map fields = blogHit.getHighlightFields(); if (fields.size() > 0) { // 2.3 通过反射,将高亮数据替换到原来的博客数据中 BeanMap beanMap = BeanMap.create(blogDoc); for (String name : fields.keySet()) { beanMap.put(name, fields.get(name).get(0)); } } // 2.4 博客数据插入列表 blogDocList.add(blogDoc); } return blogDocList; } } 替换高亮数据到这里其实就只要做一件事了,因为Repository返回的数据已经帮你封装好实体类了,不需要再去json转bean了,它唯一的缺点就是,高亮数据还是得自己去做替换,所以我上面这些代码也就是做了这一件事,就是把高亮的数据替换掉原来的数据。这里我用到了 BeanMap,代码里不用写死属性名称,相对来说更优雅一点,如果有需要的话,也可以把中间这一段分离成一个单独的方法,可以提供给不同的类使用。数据同步数据同步指的是 elasticsearch 和 MySQL 的数据同步,由于我的项目做的是微服务架构,我的博客服务和博客内容服务是两个微服务(本文讲的是博客内容服务),博客服务提供文章的增删改查功能,并连接MySQL,博客内容服务提供搜索功能,并连接ES,故两者的数据需要同步。这里我使用的是RabbitMQ,主要逻辑如下: 用户新建修改或删除博客时,博客服务发送消息到MQ中,发到自己的交换机里,并指定key。 内容服务提前创建队列并绑定到博客服务的交换机中。 当内容服务接收到消息时,做出对应的操作。 核心代码如下:/** * 内容服务对博客服务的消息队列监听器 * * @author 阿杆 * @version 1.0 * @date 2022/7/10 9:32 */ @Slf4j @Component public class BlogServerListener { @Resource private BlogRepository blogRepository; @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(name = BLOG_EXCHANGE), value = @Queue(name = BLOG_SAVE_QUEUE), key = {BLOG_INSERT_KEY, BLOG_UPDATE_KEY} )) public void saveListener(BlogDoc blogDoc) { log.debug("save blogDoc,{}", blogDoc); blogRepository.save(blogDoc); } @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(name = BLOG_EXCHANGE), value = @Queue(name = BLOG_DELETE_QUEUE), key = BLOG_DELETE_KEY )) public void deleteListener(Long blogId) { log.debug("delete blog ,id->{}", blogId); blogRepository.deleteById(blogId); } } 其实可以看出,通过Repository来实现这些操作都是很简单的。需要注意的是,这里的save操作,是ES的全量更新,所以发送过来的数据,一定要是完整的数据,否则会导致部分字段丢失。然后发送消息的大概就是代码是:rabbitTemplate.convertAndSend(BLOG_EXCHANGE, BLOG_UPDATE_KEY, blog); MQ序列化配置这里RabbitMQ的序列化配置我也贴一下,这个可以让MQ消息变成json格式的。package cn.sticki.common.amqp.autoconfig; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 阿杆 * @version 1.0 * @date 2022/6/25 18:01 */ @Configuration public class AmqpMessageConverterConfig { @Bean public MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } } 后记本篇文章主要使用了 ElasticsearchRepository 和相关注解来完成了一些常有的需求,比较优雅(个人认为)的实现了查询分页和高亮的功能(网上找到的教程都没有把分页和高亮一起适配的🤧)。但如果有更为复杂的需求,可能还是需要使用ElasticsearchRestTemplate来完成。 官网:https://docs.spring.io/spring-data/elasticsearch/docs/4.3.5/reference/html/#elasticsearch.operations.resttemplate以上文章来自[掘金]-[阿杆]本程序使用github开源项目RSSHub提取聚合!
2022年10月18日
0 阅读
0 评论
0 点赞
1
2
...
167