Java业务错误案例-2
21:解决重复代码
- 可维护性是大型项目成熟度的一个重要指标,而提升可维护性非常重要的一个手段就是减少代码重复。
21.1:模板方法消除if-else
- 假设要开发一个购物车下单的功能,针对不同用户进行不同处理:
- 普通用户需要收取运费,运费是商品价格的 10%,无商品折扣
- VIP 用户同样需要收取商品价格 10% 的快递费,但购买两件以上相同商品时,第三件开始享受一定折扣
- 内部用户可以免运费,无商品折扣
我们的目标是实现三种类型的购物车业务逻辑,把入参 Map 对象(Key 是商品 ID,Value是商品数量),转换为出参购物车类型 Cart
先实现针对普通用户的购物车处理逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
@Data public class Cart { //商品清单 private List<Item> items = new ArrayList<>(); //总优惠 private BigDecimal totalDiscount; //商品总价 private BigDecimal totalItemPrice; //总运费 private BigDecimal totalDeliveryPrice; //应付总价 private BigDecimal payPrice; } @Data public class Item { //商品Id private long id; //商品数量 private int quantity; //商品单价 private BigDecimal price; //商品优惠 private BigDecimal couponPrice; //商品运费 private BigDecimal deliveryPrice; } //普通用户购物车处理 public class NormalUserCart { public Cart process(long userId, Map<Long, Integer> items) { Cart cart = new Cart(); //把Map的购物车转换为Item列表 List<Item> itemList = new ArrayList<>(); items.entrySet().stream().forEach(entry -> { Item item = new Item(); item.setId(entry.getKey()); item.setPrice(Db.getItemPrice(entry.getKey())); item.setQuantity(entry.getValue()); itemList.add(item); }); cart.setItems(itemList); //处理运费和商品优惠 itemList.stream().forEach(item -> { //运费为商品总价的10% item.setDeliveryPrice(item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())).multiply(new BigDecimal("0.1"))); //无优惠 item.setCouponPrice(BigDecimal.ZERO); }); //计算纯商品总价 cart.setTotalItemPrice(cart.getItems().stream().map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity()))).reduce(BigDecimal.ZERO, BigDecimal::add)); //计算运费总价 cart.setTotalDeliveryPrice(cart.getItems().stream().map(Item::getDeliveryPrice).reduce(BigDecimal.ZERO, BigDecimal::add)); //计算总优惠 cart.setTotalDiscount(cart.getItems().stream().map(Item::getCouponPrice).reduce(BigDecimal.ZERO, BigDecimal::add)); //应付总价=商品总价+运费总价-总优惠 cart.setPayPrice(cart.getTotalItemPrice().add(cart.getTotalDeliveryPrice()).subtract(cart.getTotalDiscount())); return cart; } } //VIP 用户的购物车逻辑。与普通用户购物车逻辑的不同在于,VIP 用户能享受同类商品多买的折扣 public class VipUserCart { public Cart process(long userId, Map<Long, Integer> items) { Cart cart = new Cart(); List<Item> itemList = new ArrayList<>(); items.entrySet().stream().forEach(entry -> { Item item = new Item(); item.setId(entry.getKey()); item.setPrice(Db.getItemPrice(entry.getKey())); item.setQuantity(entry.getValue()); itemList.add(item); }); cart.setItems(itemList); itemList.stream().forEach(item -> { //运费为商品总价的10% item.setDeliveryPrice(item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())).multiply(new BigDecimal("0.1"))); //购买两件以上相同商品,第三件开始享受一定折扣 if (item.getQuantity() > 2) { item.setCouponPrice(item.getPrice() .multiply(BigDecimal.valueOf(100 - Db.getUserCouponPercent(userId)).divide(new BigDecimal("100"))) .multiply(BigDecimal.valueOf(item.getQuantity() - 2))); } else { item.setCouponPrice(BigDecimal.ZERO); } }); cart.setTotalItemPrice(cart.getItems().stream().map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity()))).reduce(BigDecimal.ZERO, BigDecimal::add)); cart.setTotalDeliveryPrice(cart.getItems().stream().map(Item::getDeliveryPrice).reduce(BigDecimal.ZERO, BigDecimal::add)); cart.setTotalDiscount(cart.getItems().stream().map(Item::getCouponPrice).reduce(BigDecimal.ZERO, BigDecimal::add)); cart.setPayPrice(cart.getTotalItemPrice().add(cart.getTotalDeliveryPrice()).subtract(cart.getTotalDiscount())); return cart; } } //最后是免运费、无折扣的内部用户 public class InternalUserCart { public Cart process(long userId, Map<Long, Integer> items) { Cart cart = new Cart(); List<Item> itemList = new ArrayList<>(); items.entrySet().stream().forEach(entry -> { Item item = new Item(); item.setId(entry.getKey()); item.setPrice(Db.getItemPrice(entry.getKey())); item.setQuantity(entry.getValue()); itemList.add(item); }); cart.setItems(itemList); itemList.stream().forEach(item -> { //免运费 item.setDeliveryPrice(BigDecimal.ZERO); //无优惠 item.setCouponPrice(BigDecimal.ZERO); }); cart.setTotalItemPrice(cart.getItems().stream().map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity()))).reduce(BigDecimal.ZERO, BigDecimal::add)); cart.setTotalDeliveryPrice(cart.getItems().stream().map(Item::getDeliveryPrice).reduce(BigDecimal.ZERO, BigDecimal::add)); cart.setTotalDiscount(cart.getItems().stream().map(Item::getCouponPrice).reduce(BigDecimal.ZERO, BigDecimal::add)); cart.setPayPrice(cart.getTotalItemPrice().add(cart.getTotalDeliveryPrice()).subtract(cart.getTotalDiscount())); return cart; } }
三种购物车 70% 的代码是重复的。原因很简单,虽然不同类型用户计算运费和优惠的方式不同,但整个购物车的初始化、统计总价、总运费、总优惠和支付价格的逻辑都是一样的。
有了三个购物车后,我们就需要根据不同的用户类型使用不同的购物车了。如下代码所示,使用三个 if 实现不同类型用户调用不同购物车的 process 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
@GetMapping("wrong") public Cart wrong(@RequestParam("userId") int userId) { //根据用户ID获得用户类型 String userCategory = Db.getUserCategory(userId); //普通用户处理逻辑 if (userCategory.equals("Normal")) { NormalUserCart normalUserCart = new NormalUserCart(); return normalUserCart.process(userId, items); } //VIP用户处理逻辑 if (userCategory.equals("Vip")) { VipUserCart vipUserCart = new VipUserCart(); return vipUserCart.process(userId, items); } //内部用户处理逻辑 if (userCategory.equals("Internal")) { InternalUserCart internalUserCart = new InternalUserCart(); return internalUserCart.process(userId, items); } return null; }
如果我们熟记抽象类和抽象方法的定义的话,这时或许就会想到,是否可以把重复的逻辑定义在抽象类中,三个购物车只要分别实现不同的那份逻辑呢?其实,这个模式就是模板方法模式。我们在父类中实现了购物车处理的流程模板,然后把需要特殊处理的地方留空白也就是留抽象方法定义,让子类去实现其中的逻辑。由于父类的逻辑不完整无法单独工作,因此需要定义为抽象类。
AbstractCart 抽象类实现了购物车通用的逻辑,额外定义了两个抽象方法让子类去实现。其中,processCouponPrice 方法用于计算商品折扣,processDeliveryPrice 方法用于计算运费。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
public abstract class AbstractCart { //处理购物车的大量重复逻辑在父类实现 public Cart process(long userId, Map<Long, Integer> items) { Cart cart = new Cart(); List<Item> itemList = new ArrayList<>(); items.entrySet().stream().forEach(entry -> { Item item = new Item(); item.setId(entry.getKey()); item.setPrice(Db.getItemPrice(entry.getKey())); item.setQuantity(entry.getValue()); itemList.add(item); }); cart.setItems(itemList); //让子类处理每一个商品的优惠 itemList.stream().forEach(item -> { processCouponPrice(userId, item); processDeliveryPrice(userId, item); }); //计算商品总价 cart.setTotalItemPrice(cart.getItems().stream().map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity()))).reduce(BigDecimal.ZERO, BigDecimal::add)); //计算总运费 cart.setTotalDeliveryPrice(cart.getItems().stream().map(Item::getDeliveryPrice).reduce(BigDecimal.ZERO, BigDecimal::add)); //计算总折扣 cart.setTotalDiscount(cart.getItems().stream().map(Item::getCouponPrice).reduce(BigDecimal.ZERO, BigDecimal::add)); //计算应付价格 cart.setPayPrice(cart.getTotalItemPrice().add(cart.getTotalDeliveryPrice()).subtract(cart.getTotalDiscount())); return cart; } //处理商品优惠的逻辑留给子类实现 protected abstract void processCouponPrice(long userId, Item item); //处理配送费的逻辑留给子类实现 protected abstract void processDeliveryPrice(long userId, Item item); }
有了这个抽象类,三个子类的实现就非常简单了。普通用户的购物车 NormalUserCart,实现的是 0 优惠和 10% 运费的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
@Service(value = "NormalUserCart") public class NormalUserCart extends AbstractCart { @Override protected void processCouponPrice(long userId, Item item) { item.setCouponPrice(BigDecimal.ZERO); } @Override protected void processDeliveryPrice(long userId, Item item) { item.setDeliveryPrice(item.getPrice() .multiply(BigDecimal.valueOf(item.getQuantity())) .multiply(new BigDecimal("0.1"))); } } //VIP 用户的购物车 VipUserCart,直接继承了 NormalUserCart,只需要修改多买优惠策略 @Service(value = "VipUserCart") public class VipUserCart extends NormalUserCart { @Override protected void processCouponPrice(long userId, Item item) { if (item.getQuantity() > 2) { item.setCouponPrice(item.getPrice() .multiply(BigDecimal.valueOf(100 - Db.getUserCouponPercent(userId)).divide(new BigDecimal("100"))) .multiply(BigDecimal.valueOf(item.getQuantity() - 2))); } else { item.setCouponPrice(BigDecimal.ZERO); } } } //内部用户购物车 InternalUserCart 是最简单的,直接设置 0 运费和 0 折扣即可 @Service(value = "InternalUserCart") public class InternalUserCart extends AbstractCart { @Override protected void processCouponPrice(long userId, Item item) { item.setCouponPrice(BigDecimal.ZERO); } @Override protected void processDeliveryPrice(long userId, Item item) { item.setDeliveryPrice(BigDecimal.ZERO); } }
接下来,我们再看看如何能避免三个 if 逻辑。
定义三个购物车子类时,我们在 @Service 注解中对 Bean 进行了命名。既然三个购物车都叫 XXXUserCart,那我们就可以把用户类型字符串拼接 UserCart构成购物车 Bean 的名称,然后利用 Spring 的 IoC 容器,通过 Bean 的名称直接获取到AbstractCart,调用其 process 方法即可实现通用
其实,这就是工厂模式,只不过是借助 Spring 容器实现罢了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
@Slf4j @RestController @RequestMapping("templatemethod") public class TemplateMethodController { private static Map<Long, Integer> items = new HashMap<>(); static { items.put(1L, 2); items.put(2L, 4); } @Autowired private ApplicationContext applicationContext; @GetMapping("wrong") public Cart wrong(@RequestParam("userId") int userId) { String userCategory = Db.getUserCategory(userId); if (userCategory.equals("Normal")) { NormalUserCart normalUserCart = new NormalUserCart(); return normalUserCart.process(userId, items); } if (userCategory.equals("Vip")) { VipUserCart vipUserCart = new VipUserCart(); return vipUserCart.process(userId, items); } if (userCategory.equals("Internal")) { InternalUserCart internalUserCart = new InternalUserCart(); return internalUserCart.process(userId, items); } return null; } @GetMapping("right") public Cart right(@RequestParam("userId") int userId) { String userCategory = Db.getUserCategory(userId); AbstractCart cart = (AbstractCart) applicationContext.getBean(userCategory + "UserCart"); return cart.process(userId, items); } }
之后如果有了新的用户类型、新的用户逻辑,是不是完全不用对代码做任何修改,只要新增一个 XXXUserCart 类继承 AbstractCart,实现特殊的优惠和运费处理逻辑就可以了
- 这样一来,我们就利用工厂模式 + 模板方法模式,不仅消除了重复代码,还避免了修改既有代码的风险。这就是设计模式中的开闭原则:对修改关闭,对扩展开放
21.2:利用注解和反射
假设银行提供了一些 API 接口,对参数的序列化有点特殊,不使用 JSON,而是需要我们把参数依次拼在一起构成一个大字符串。
按照银行提供的 API 文档的顺序,把所有参数构成定长的数据,然后拼接在一起作为整个字符串。
因为每一种参数都有固定长度,未达到长度时需要做填充处理:
- 字符串类型的参数不满长度部分需要以下划线右填充,也就是字符串内容靠左;
- 数字类型的参数不满长度部分以 0 左填充,也就是实际数字靠右
- 货币类型的表示需要把金额向下舍入 2 位到分,以分为单位,作为数字类型同样进行左填充
对所有参数做 MD5 操作作为签名(为了方便理解,Demo 中不涉及加盐处理)。
比如,创建用户方法和支付方法的定义是这样的
代码很容易实现,直接根据接口定义实现填充操作、加签名、请求调用操作即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
public class BankService { public static String createUser(String name, String identity, String mobile, int age) throws IOException { //创建用户方法 StringBuilder stringBuilder = new StringBuilder(); //字符串靠左,多余的地方_填充 stringBuilder.append(String.format("%-10s", name).replace(' ', '_')); //字符串靠左,多余的地方_填充 stringBuilder.append(String.format("%-18s", identity).replace(' ', '_')); //数字靠右,多余的地方用0填充 stringBuilder.append(String.format("%05d", age)); //字符串靠左,多余的地方_填充 stringBuilder.append(String.format("%-11s", mobile).replace(' ', '_')); //最后加上MD5作为签名 stringBuilder.append(DigestUtils.md2Hex(stringBuilder.toString())); return Request.Post("http://localhost:45678/reflection/bank/createUser") .bodyString(stringBuilder.toString(), ContentType.APPLICATION_JSON) .execute().returnContent().asString(); } //支付方法 public static String pay(long userId, BigDecimal amount) throws IOException { StringBuilder stringBuilder = new StringBuilder(); //数字靠右,多余的地方用0填充 stringBuilder.append(String.format("%020d", userId)); //金额向下舍入2位到分,以分为单位,作为数字靠右,多余的地方用0填充 stringBuilder.append(String.format("%010d", amount.setScale(2, RoundingMode.DOWN).multiply(new BigDecimal("100")).longValue())); //最后加上MD5作为签名 stringBuilder.append(DigestUtils.md2Hex(stringBuilder.toString())); return Request.Post("http://localhost:45678/reflection/bank/pay") .bodyString(stringBuilder.toString(), ContentType.APPLICATION_JSON) .execute().returnContent().asString(); } }
可以看到,这段代码的重复粒度更细:
- 三种标准数据类型的处理逻辑有重复,稍有不慎就会出现 Bug
- 处理流程中字符串拼接、加签和发请求的逻辑,在所有方法重复
- 实际方法的入参的参数类型和顺序,不一定和接口要求一致,容易出错
- 代码层面针对每一个参数硬编码,无法清晰地进行核对,如果参数达到几十个、上百个,出错的概率极大
使用注解和反射这两个武器,就可以针对银行请求的所有逻辑均使用一套代码实现,不会出现任何重复。
要实现接口逻辑和逻辑实现的剥离,首先需要以 POJO 类(只有属性没有任何业务逻辑的数据类)的方式定义所有的接口参数
1 2 3 4 5 6 7 8 9 10 11 12
@Data public class CreateUserAPI { private String name; private String identity; private String mobile; private int age; }
有了接口参数定义,我们就能通过自定义注解为接口和所有参数增加一些元数据。如下所示,我们定义一个接口 API 的注解 BankAPI,包含接口 URL 地址和接口说明
1 2 3 4 5 6 7 8 9
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Inherited public @interface BankAPI { String desc() default ""; String url() default ""; }
//再定义一个自定义注解 @BankAPIField,用于描述接口的每一个字段规范,包含参数的次序、类型和长度三个属性
1
2
3
4
5
6
7
8
9
10
11
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.FIELD)
@Documented
@Inherited
public @interface BankAPIField {
int order() default -1;
int length() default -1;
String type() default "";
}
我们定义了 CreateUserAPI 类描述创建用户接口的信息,通过为接口增加@BankAPI 注解,来补充接口的 URL 和描述等元数据;通过为每一个字段增加@BankAPIField 注解,来补充参数的顺序、类型和长度等元数据
这 2 个类继承的 AbstractAPI 类是一个空实现,因为这个案例中的接口并没有公共数据可以抽象放到基类。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class AbstractAPI {
}
@BankAPI(url = "/bank/createUser", desc = "创建用户接口")
@Data
public class CreateUserAPI extends AbstractAPI {
@BankAPIField(order = 1, type = "S", length = 10)
private String name;
@BankAPIField(order = 2, type = "S", length = 18)
private String identity;
@BankAPIField(order = 4, type = "S", length = 11)
private String mobile;
@BankAPIField(order = 3, type = "N", length = 5)
private int age;
}
@BankAPI(url = "/bank/pay", desc = "支付接口")
@Data
public class PayAPI extends AbstractAPI{
@BankAPIField(order = 1, type = "N", length = 20)
private long userId;
@BankAPIField(order = 2, type = "M", length = 10)
private BigDecimal amount;
}
通过这 2 个类,我们可以在几秒钟内完成和 API 清单表格的核对。理论上,如果我们的核心翻译过程(也就是把注解和接口 API 序列化为请求需要的字符串的过程)没问题,只要注解和表格一致,API 请求的翻译就不会有任何问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
@Slf4j public class BetterBankService { //创建用户方法 public static String createUser(String name, String identity, String mobile, int age) throws IOException { CreateUserAPI createUserAPI = new CreateUserAPI(); createUserAPI.setName(name); createUserAPI.setIdentity(identity); createUserAPI.setAge(age); createUserAPI.setMobile(mobile); return remoteCall(createUserAPI); } //支付方法 public static String pay(long userId, BigDecimal amount) throws IOException { PayAPI payAPI = new PayAPI(); payAPI.setUserId(userId); payAPI.setAmount(amount); return remoteCall(payAPI); } /* 所有处理参数排序、填充、加签、请求调用的核心逻辑,都汇聚在了remoteCall 方法中。有了这个核心方法,BankService 中每一个接口的实现就非常简单了,只是参数的组装,然后调用 remoteCall 即可 */ private static String remoteCall(AbstractAPI api) throws IOException { //从BankAPI注解获取请求地址 //我们从类上获得了 BankAPI 注解,然后拿到其 URL 属性,后续进行远程调用。 BankAPI bankAPI = api.getClass().getAnnotation(BankAPI.class); bankAPI.url(); StringBuilder stringBuilder = new StringBuilder(); //使用 stream 快速实现了获取类中所有带 BankAPIField 注解的字段,并把字段按 order 属性排序,然后设置私有字段反射可访问。 Arrays.stream(api.getClass().getDeclaredFields()) //获得所有字段 .filter(field -> field.isAnnotationPresent(BankAPIField.class)) //查找标记了注解的字段 .sorted(Comparator.comparingInt(a -> a.getAnnotation(BankAPIField.class).order())) //根据注解中的order对字段排序 .peek(field -> field.setAccessible(true)) //设置可以访问私有字段 .forEach(field -> { //现了反射获取注解的值,然后根据 BankAPIField 拿到的参数类型,按照三种标准进行格式化,将所有参数的格式化逻辑集中在了这一处 //获得注解 BankAPIField bankAPIField = field.getAnnotation(BankAPIField.class); Object value = ""; try { //反射获取字段值 value = field.get(api); } catch (IllegalAccessException e) { e.printStackTrace(); } //根据字段类型以正确的填充方式格式化字符串 switch (bankAPIField.type()) { case "S": { stringBuilder.append(String.format("%-" + bankAPIField.length() + "s", value.toString()).replace(' ', '_')); break; } case "N": { stringBuilder.append(String.format("%" + bankAPIField.length() + "s", value.toString()).replace(' ', '0')); break; } case "M": { if (!(value instanceof BigDecimal)) throw new RuntimeException(String.format("{} 的 {} 必须是BigDecimal", api, field)); stringBuilder.append(String.format("%0" + bankAPIField.length() + "d", ((BigDecimal) value).setScale(2, RoundingMode.DOWN).multiply(new BigDecimal("100")).longValue())); break; } default: break; } }); //签名逻辑 stringBuilder.append(DigestUtils.md2Hex(stringBuilder.toString())); String param = stringBuilder.toString(); long begin = System.currentTimeMillis(); //发请求 String result = Request.Post("http://localhost:45678/reflection" + bankAPI.url()) .bodyString(param, ContentType.APPLICATION_JSON) .execute().returnContent().asString(); log.info("调用银行API {} url:{} 参数:{} 耗时:{}ms", bankAPI.desc(), bankAPI.url(), param, System.currentTimeMillis() - begin); return result; } }
- 许多涉及类结构性的通用处理,都可以按照这个模式来减少重复代码。反射给予了我们在不知晓类结构的时候,按照固定的逻辑处理类的成员;而注解给了我们为这些成员补充元数据的能力,使得我们利用反射实现通用逻辑的时候,可以从外部获得更多我们关心的数据。
21.3:属性拷贝工具消除重复代码
21.3.1 Apache工具
在Apache Commons BeanUtils库中,有两个常用的方法可以用来拷贝对象的属性:PropertyUtils.copyProperties() 和 BeanUtils.copyProperties()。这两个方法之间的主要区别在于它们处理属性的方式不同。
PropertyUtils.copyProperties()PropertyUtils.copyProperties()方法是基于属性的拷贝,它只会拷贝对象的属性值,不会拷贝对象的类型转换器、事件监听器等其他信息。- 这个方法是比较底层的,仅仅复制属性的值,不涉及类型转换等高级功能。
- 适合在需要简单属性拷贝的情况下使用,比如拷贝基本数据类型的属性。
- 这个方法会忽略源对象中属性值为 null 的属性,不会将这些属性拷贝到目标对象中。
BeanUtils.copyProperties()BeanUtils.copyProperties()方法是基于JavaBeans规范的拷贝,它会尝试自动进行类型转换,以确保源对象的属性值可以正确地赋值给目标对象的属性。- 这个方法更智能,会尝试根据属性类型进行自动转换,比如将字符串转换为整数等。
- 适合在需要处理类型转换的情况下使用,比如拷贝包含不同类型属性的对象。
- 这个方法会将源对象中属性值为 null 的属性拷贝到目标对象中,即使属性值为 null。
总的来说,如果您只需要简单地拷贝对象的属性值,并且不需要进行类型转换等高级操作,可以使用 PropertyUtils.copyProperties() 方法;如果您需要处理类型转换等复杂情况,可以选择使用 BeanUtils.copyProperties() 方法。
结合两者的优缺点共有两种方案
1:使用类型转换器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import org.apache.commons.beanutils.BeanUtilsBean;
import org.apache.commons.beanutils.ConvertUtils;
import org.apache.commons.beanutils.converters.LongConverter;
import org.apache.commons.beanutils.converters.IntegerConverter;
public class CustomBeanUtils {
public static void main(String[] args) {
// 创建自定义的 ConvertUtilsBean 对象并配置类型转换器
ConvertUtilsBean convertUtilsBean = new ConvertUtilsBean();
convertUtilsBean.register(new LongConverter(null), Long.class);
convertUtilsBean.register(new IntegerConverter(null), Integer.class);
// 创建 BeanUtilsBean 对象并设置自定义的 ConvertUtilsBean
BeanUtilsBean beanUtilsBean = new BeanUtilsBean(convertUtilsBean);
// 创建源对象和目标对象
SourceBean source = new SourceBean();
source.setAge("30");
source.setSalary("1000");
TargetBean target = new TargetBean();
try {
// 使用 PropertyUtils.copyProperties() 方法拷贝属性
beanUtilsBean.copyProperties(target, source);
// 输出目标对象的属性
System.out.println(target.getAge()); // 输出:30
System.out.println(target.getSalary()); // 输出:1000
} catch (Exception e) {
e.printStackTrace();
}
}
// 示例的源对象类
public static class SourceBean {
private String age;
private String salary;
// 省略 getter 和 setter 方法
}
// 示例的目标对象类
public static class TargetBean {
private Integer age;
private Integer salary;
// 省略 getter 和 setter 方法
}
}
2:自定义工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.FatalBeanException;
import org.springframework.util.Assert;
import java.beans.PropertyDescriptor;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
/**
* 属性拷贝工具类
*
* @author qinfen
* @date 2022/07/14
*/
public class MyCopyUtils {
/**
* 功能说明: 只复制source对象的非空属性到target对象上
*
* @param source 源
* @param target 目标
* @throws BeansException exception
*/
public static void copyNoNullProperties(Object source, Object target) throws BeansException {
Assert.notNull(source, "Source must not be null");
Assert.notNull(target, "Target must not be null");
Class<?> actualEditable = target.getClass();
PropertyDescriptor[] targetPds = BeanUtils.getPropertyDescriptors(actualEditable);
for (PropertyDescriptor targetPd : targetPds) {
if (targetPd.getWriteMethod() != null) {
PropertyDescriptor sourcePd = BeanUtils.getPropertyDescriptor(source.getClass(), targetPd.getName());
if (sourcePd != null && sourcePd.getReadMethod() != null) {
try {
Method readMethod = sourcePd.getReadMethod();
if (!Modifier.isPublic(readMethod.getDeclaringClass().getModifiers())) {
readMethod.setAccessible(true);
}
Object value = readMethod.invoke(source);
// 这里判断以下value是否为空 当然这里也能进行一些特殊要求的处理 例如绑定时格式转换等等
if (value != null) {
Method writeMethod = targetPd.getWriteMethod();
if (!Modifier.isPublic(writeMethod.getDeclaringClass().getModifiers())) {
writeMethod.setAccessible(true);
}
writeMethod.invoke(target, value);
}
} catch (Throwable ex) {
throw new FatalBeanException("Could not copy properties from source to target", ex);
}
}
}
}
}
}
总结,尽量在设计阶段规避问题,尽量只使用属性值拷贝
21.3.2 mapstruct
MapStruct是在编译时根据注解生成实际的Java代码来执行对象转换,这意味着运行时不再依赖反射机制,从而可能获得更好的性能和类型安全保证。
在MapStruct这样的自动化映射工具中,具体的转换规则通常是基于源对象和目标对象的属性名匹配以及它们的数据类型兼容性来自动处理的。若属性名相同且类型一致或可转换,MapStruct可以自动完成映射。
MapStruct提供了丰富的映射定制能力,可以通过注解精确控制属性间的映射关系、自定义转换逻辑,包括处理不同类型属性的转换、忽略某些属性等。
使用MapStruct可以将转换逻辑集中在一处,通过注解方式表达,使得映射规则更清晰、易于理解和维护。
在大型项目中,尤其是在强调模块化、代码复用和长期维护性的场合,MapStruct由于其良好的组织结构和代码生成特性,可以提供更好的扩展性和可控性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Mapper(builder = @Builder(disableBuilder = true))
public interface RecordConvert {
RecordConvert INSTANCE = Mappers.getMapper(RecordConvert.class);
// 基础映射,属性名称相同的简单映射;明确指定源对象中的属性与目标对象中属性的映射关系。
@Mapping(source = "sourceProperty", target = "targetProperty")
RecordData sessionConvertRecord(SessionRecordModel model);
// 列表转换
List<RecordData> sessionConvertRecord(List<SessionRecordModel> models);
// 自定义转换逻辑
@Mapping(source = "startDate", target = "recordStartDateTime", qualifiedByName = "customDateToLocalDateTime")
RecordData advancedConvert(SessionRecordModel model);
// 定义一个自定义转换方法
@Named("customDateToLocalDateTime")
default LocalDateTime customDateToLocalDateTime(Date startDate) {
return startDate.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
}
// 指示MapStruct在转换过程中忽略指定的源对象属性。
@Mapping(ignore = true)
RecordData ignoreSourceProperty(SessionRecordModel model);
}
在MapStruct中,您可以定义一个基接口,然后让各个实体类对应的转换接口继承这个基接口,以减少重复的注解和方法定义
基类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Mapper(uses = {CustomDateConverter.class}, builder = @Builder(disableBuilder = true))
public interface BaseConverter<S, T> {
T convert(S source);
List<T> convertList(List<S> sources);
// 可以在这里定义通用的转换规则和默认转换方法
class CustomDateConverter {
// 通用日期转换方法
@Named("customDateToLocalDateTime")
LocalDateTime dateToLocalDateTime(Date date) {
return date.toInstant().atZone(ZoneId.systemDefault()).toLocalDateTime();
}
}
}
// 然后针对具体实体类定义转换接口并继承BaseConverter
@Mapper
public interface RecordConvert extends BaseConverter<SessionRecordModel, RecordData> {
//用于获取实现了该接口的具体类的实例。这样设计的好处在于,无需显式创建RecordConvert接口的实现类对象,就可以直接调用接口中的转换方法进行类型转换操作
RecordConvert INSTANCE = Mappers.getMapper(RecordConvert.class);
// 如果SessionRecordModel和RecordData之间有特殊的映射规则,可以在这里添加额外的注解和方法
// 若没有特殊规则,则可以直接使用基类中的convert()和convertList()方法
}
//使用
buildList.add(RecordConvert.INSTANCE.sessionConvertRecord(e));
21.4:思考与讨论
- 除了模板方法设计模式是减少重复代码的一把好手,观察者模式也常用于减少代码重复(并且是松耦合方式)。Spring 也提供了类似工具你能想到有哪些应用场景吗?
- 关于 Bean 属性复制工具,除了最简单的 Spring 的 BeanUtils 工具类的使用,你还知道哪些对象映射类库吗?它们又有什么功能呢?
22:接口设计
接口的设计需要考虑的点非常多,比如接口的命名、参数列表、包装结构体、接口粒度、版本策略、幂等性实现、同步异步处理方式
等
和接口设计相关比较重要的点有三个,分别是包装结构体、版本策略、同步异步处理方式。
22.1:明确处理结果
为了将接口设计得更合理,我们需要考虑如下两个原则:
- 对外隐藏内部实现。
- 设计接口结构时,明确每个字段的含义,以及客户端的处理方式。
为了简化服务端代码,我们可以把包装 API 响应体 APIResponse的工作交由框架自动完成,这样直接返回 DTO OrderInfo 即可。对于业务逻辑错误,可以抛出一个自定义异常
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@GetMapping("server") public OrderInfo server(@RequestParam("userId") Long userId) { if (userId == null) { throw new APIException(3001, "Illegal userId"); } if (userId == 1) { //把订单服务的错误包装转换 //同时日志记录内部错误 log.warn("用户 {} 调用订单服务失败,原因是 Risk order detected", userId); throw new APIException(3002, "Internal Error, order is cancelled"); } return new OrderInfo("Created", 2L); }
在 APIException 中包含错误码和错误消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
public class APIException extends RuntimeException { @Getter private int errorCode; @Getter private String errorMessage; public APIException(int errorCode, String errorMessage) { super(errorMessage); this.errorCode = errorCode; this.errorMessage = errorMessage; } public APIException(Throwable cause, int errorCode, String errorMessage) { super(errorMessage, cause); this.errorCode = errorCode; this.errorMessage = errorMessage; } }
然后,定义一个 @RestControllerAdvice 来完成自动包装响应体的工作:
- 通过实现 ResponseBodyAdvice 接口的 beforeBodyWrite 方法,来处理成功请求的响应体转换
- 实现一个 @ExceptionHandler 来处理业务异常时,APIException 到 APIResponse 的转换
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
//此段代码只是Demo,生产级应用还需要扩展很多细节 @RestControllerAdvice @Slf4j public class APIResponseAdvice implements ResponseBodyAdvice<Object> { @Autowired private ObjectMapper objectMapper; //自动处理APIException,包装为APIResponse @ExceptionHandler(APIException.class) public APIResponse handleApiException(HttpServletRequest request, APIException ex) { log.error("process url {} failed", request.getRequestURL().toString(), ex); APIResponse apiResponse = new APIResponse(); apiResponse.setSuccess(false); apiResponse.setCode(ex.getErrorCode()); apiResponse.setMessage(ex.getErrorMessage()); return apiResponse; } @ExceptionHandler(NoHandlerFoundException.class) public APIResponse handleException(NoHandlerFoundException ex) { log.error(ex.getMessage(), ex); APIResponse apiResponse = new APIResponse(); apiResponse.setSuccess(false); apiResponse.setCode(4000); apiResponse.setMessage(ex.getMessage()); return apiResponse; } //仅当方法或类没有标记@NoAPIResponse才自动包装 @Override public boolean supports(MethodParameter returnType, Class converterType) { return returnType.getParameterType() != APIResponse.class && AnnotationUtils.findAnnotation(returnType.getMethod(), NoAPIResponse.class) == null && AnnotationUtils.findAnnotation(returnType.getDeclaringClass(), NoAPIResponse.class) == null; } //自动包装外层APIResposne响应 @SneakyThrows @Override public Object beforeBodyWrite(Object body, MethodParameter returnType, MediaType selectedContentType, Class<? extends HttpMessageConverter<?>> selectedConverterType, ServerHttpRequest request, ServerHttpResponse response) { APIResponse apiResponse = new APIResponse(); apiResponse.setSuccess(true); apiResponse.setMessage("OK"); apiResponse.setCode(2000); apiResponse.setData(body); if (body instanceof String) { response.getHeaders().setContentType(MediaType.APPLICATION_JSON); return objectMapper.writeValueAsString(apiResponse); } else { return apiResponse; } } }
在这里,我们实现了一个 @NoAPIResponse 自定义注解。如果某些 @RestController 的接口不希望实现自动包装的话,可以标记这个注解
1 2 3 4
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface NoAPIResponse { }
在 ResponseBodyAdvice 的 support 方法中,我们排除了标记有这个注解的方法或类的自动响应体包装。比如,对于刚才我们实现的测试客户端 client 方法不需要包装为APIResponse,就可以标记上这个注解:这样我们的业务逻辑中就不需要考虑响应体的包装,代码会更简洁
1 2 3 4 5
@GetMapping("client") @NoAPIResponse publicString client(@RequestParam(value = "error", defaultValue = "0") int error){ }
22.2:考虑接口变迁的版本控制策略
接口不可能一成不变,需要根据业务需求不断增加内部逻辑。如果做大的功能调整或重构,涉及参数定义的变化或是参数废弃,导致接口无法向前兼容,这时接口就需要有版本的概念。在考虑接口版本策略设计时,我们需要注意的是,最好一开始就明确版本策略,并考虑在整个服务端统一版本策略
第一,版本策略最好一开始就考虑
既然接口总是要变迁的,那么最好一开始就确定版本策略。比如,确定是通过 URL Path 实现,是通过 QueryString 实现,还是通过 HTTP 头实现。这三种实现方式的代码如下
1 2 3 4 5 6 7 8 9 10 11 12
//通过URL Path实现版本控制 @GetMapping("/v1/api/user") public int right1(){return1;} //通过QueryString中的version参数实现版本控制 @GetMapping(value = "/api/user", params = "version=2") public int right2(@RequestParam("version")int version) {return2;} //通过请求头中的X-API-VERSION参数实现版本控制 @GetMapping(value = "/api/user", headers = "X-API-VERSION=3") public int right3(@RequestHeader("X-API-VERSION")int version) {return3;}
这三种方式中,URL Path 的方式最直观也最不容易出错;QueryString 不易携带,不太推荐作为公开 API 的版本策略;HTTP 头的方式比较没有侵入性,如果仅仅是部分接口需要进行版本控制,可以考虑这种方式。
第二,版本实现方式要统一
相比于在每一个接口的 URL Path 中设置版本号,更理想的方式是在框架层面实现统一。如果你使用 Spring 框架的话,可以按照下面的方式自定义RequestMappingHandlerMapping 来实现。
首先,创建一个注解来定义接口的版本。@APIVersion 自定义注解可以应用于方法或Controller 上
1 2 3 4
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface APIVersion {String[] value();}
然后,定义一个 APIVersionHandlerMapping 类继承RequestMappingHandlerMapping。RequestMappingHandlerMapping 的作用,是根据类或方法上的 @RequestMapping来生成 RequestMappingInfo 的实例。我们覆盖 registerHandlerMethod 方法的实现,从 @APIVersion 自定义注解中读取版本信息,拼接上原有的、不带版本号的 URL Pattern,构成新的 RequestMappingInfo,来通过注解的方式为接口增加基于 URL 的版本号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
public class APIVersionHandlerMapping extends RequestMappingHandlerMapping { @Override protected boolean isHandler(Class<?> beanType) { return AnnotatedElementUtils.hasAnnotation(beanType, Controller.class); } @Override protected void registerHandlerMethod(Object handler, Method method, RequestMappingInfo mapping) { Class<?> controllerClass = method.getDeclaringClass(); //类上的APIVersion注解 APIVersion apiVersion = AnnotationUtils.findAnnotation(controllerClass, APIVersion.class); //方法上的APIVersion注解 APIVersion methodAnnotation = AnnotationUtils.findAnnotation(method, APIVersion.class); //以方法上的注解优先 if (methodAnnotation != null) { apiVersion = methodAnnotation; } String[] urlPatterns = apiVersion == null ? new String[0] : apiVersion.value(); PatternsRequestCondition apiPattern = new PatternsRequestCondition(urlPatterns); PatternsRequestCondition oldPattern = mapping.getPatternsCondition(); PatternsRequestCondition updatedFinalPattern = apiPattern.combine(oldPattern); //重新构建RequestMappingInfo mapping = new RequestMappingInfo(mapping.getName(), updatedFinalPattern, mapping.getMethodsCondition(), mapping.getParamsCondition(), mapping.getHeadersCondition(), mapping.getConsumesCondition(), mapping.getProducesCondition(), mapping.getCustomCondition()); super.registerHandlerMethod(handler, method, mapping); } }
最后,也是特别容易忽略的一点,要通过实现 WebMvcRegistrations 接口,来生效自定义的 APIVersionHandlerMapping
1 2 3 4 5 6 7 8 9 10 11 12
@SpringBootApplication public class CommonMistakesApplication implements WebMvcRegistrations { public static void main(String[] args) { SpringApplication.run(CommonMistakesApplication.class, args); } @Override public RequestMappingHandlerMapping getRequestMappingHandlerMapping() { return new APIVersionHandlerMapping(); } }
这样,就实现了在 Controller 上或接口方法上通过注解,来实现以统一的 Pattern 进行版本号控制:
1 2 3 4
@GetMapping(value = "/api/user") @APIVersion("v4") public int right4(){return4;}
加上注解后,访问浏览器查看效果:localhost:8080/v4/api/user
使用框架来明确 API 版本的指定策略,不仅实现了标准化,更实现了强制的 API 版本控制。对上面代码略做修改,我们就可以实现不设置 @APIVersion 接口就给予报错提示。
实现一套统一的基于请求头方式的版本控制
注解
1 2 3 4 5 6 7
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface APIVersion { String value(); String headerKey() default "X-API-VERSION"; }
mapping
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
public class APIVersionHandlerMapping extends RequestMappingHandlerMapping { @Override protected boolean isHandler(Class<?> beanType) { return AnnotatedElementUtils.hasAnnotation(beanType, Controller.class); } @Override protected RequestCondition<APIVersionCondition> getCustomTypeCondition(Class<?> handlerType) { APIVersion apiVersion = AnnotationUtils.findAnnotation(handlerType, APIVersion.class); return createCondition(apiVersion); } @Override protected RequestCondition<APIVersionCondition> getCustomMethodCondition(Method method) { APIVersion apiVersion = AnnotationUtils.findAnnotation(method, APIVersion.class); return createCondition(apiVersion); } private RequestCondition<APIVersionCondition> createCondition(APIVersion apiVersion) { return apiVersion == null ? null : new APIVersionCondition(apiVersion.value(), apiVersion.headerKey()); } }
condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
public class APIVersionCondition implements RequestCondition<APIVersionCondition> { @Getter private String apiVersion; @Getter private String headerKey; public APIVersionCondition(String apiVersion, String headerKey) { this.apiVersion = apiVersion; this.headerKey = headerKey; } @Override public APIVersionCondition combine(APIVersionCondition other) { return new APIVersionCondition(other.getApiVersion(), other.getHeaderKey()); } @Override public APIVersionCondition getMatchingCondition(HttpServletRequest request) { String version = request.getHeader(headerKey); return apiVersion.equals(version) ? this : null; } @Override public int compareTo(APIVersionCondition other, HttpServletRequest request) { return 0; } }
application
1 2 3 4 5 6 7 8 9 10 11 12 13
@SpringBootApplication public class CommonMistakesApplication implements WebMvcRegistrations { public static void main(String[] args) { SpringApplication.run(CommonMistakesApplication.class, args); } @Override public RequestMappingHandlerMapping getRequestMappingHandlerMapping() { return new APIVersionHandlerMapping(); } }
controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
@Slf4j @RequestMapping("apiversion") @RestController @APIVersion("v1") public class APIVersoinController { @GetMapping(value = "/api/user") public int version1() { return 1; } @GetMapping(value = "/api/user") @APIVersion("v2") public int version2() { return 2; } }
22.3:接口处理方式要明确同步还是异步
有一个文件上传服务 FileService,其中一个 upload 文件上传接口特别慢,原因是这个上传接口在内部需要进行两步操作,首先上传原图,然后压缩后上传缩略图。如果每一步都耗时 5 秒的话,那么这个接口返回至少需要 10 秒的时间。
于是,把接口改为了异步处理,每一步操作都限定了超时时间,也就是分别把上传原文件和上传缩略图的操作提交到线程池,然后等待一定的时间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
@Service public class FileService { private ExecutorService threadPool = Executors.newFixedThreadPool(2); private AtomicInteger atomicInteger = new AtomicInteger(0); private ConcurrentHashMap<String, SyncQueryUploadTaskResponse> downloadUrl = new ConcurrentHashMap<>(); //两个文件上传方法uploadFile和uploadThumbnailFile的实现 private String uploadFile(byte[] data) { try { TimeUnit.MILLISECONDS.sleep(500 + ThreadLocalRandom.current().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "http://www.demo.com/download/" + UUID.randomUUID().toString(); } private String uploadThumbnailFile(byte[] data) { try { TimeUnit.MILLISECONDS.sleep(1500 + ThreadLocalRandom.current().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } return "http://www.demo.com/download/" + UUID.randomUUID().toString(); } //① 上传接口的请求和响应比较简单,传入二进制文件,传出原文件和缩略图下载地址 public UploadResponse upload(UploadRequest request) { UploadResponse response = new UploadResponse(); //上传原始文件任务提交到线程池处理 Future<String> uploadFile = threadPool.submit(() -> uploadFile(request.getFile())); //上传缩略图任务提交到线程池处理 Future<String> uploadThumbnailFile = threadPool.submit(() -> uploadThumbnailFile(request.getFile())); //等待上传原始文件任务完成,最多等待1秒 try { response.setDownloadUrl(uploadFile.get(1, TimeUnit.SECONDS)); } catch (Exception e) { e.printStackTrace(); } //等待上传缩略图任务完成,最多等待1秒 try { response.setThumbnailDownloadUrl(uploadThumbnailFile.get(1, TimeUnit.SECONDS)); } catch (Exception e) { e.printStackTrace(); } return response; } } @Data public class UploadRequest{private byte[] file;} @Data public class UploadResponse{ private String downloadUrl; private String thumbnailDownloadUrl; }
这种实现方式的问题是什么?从接口命名上看虽然是同步上传操作,但其内部通过线程池进行异步上传,并因为设置了较短超时所以接口整体响应挺快。但是,一旦遇到超时,接口就不能返回完整的数据,不是无法拿到原文件下载地址,就是无法拿到缩略图下载地址,接口的行为变得不可预测
所以,这种优化接口响应速度的方式并不可取,更合理的方式是,让上传接口要么是彻底的同步处理,要么是彻底的异步处理:
- 所谓同步处理,接口一定是同步上传原文件和缩略图的,调用方可以自己选择调用超时,如果来得及可以一直等到上传完成,如果等不及可以结束等待,下一次再重试
- 所谓异步处理,接口是两段式的,上传接口本身只是返回一个任务 ID,然后异步做上传操作,上传接口响应很快,客户端需要之后再拿着任务 ID 调用任务查询接口查询上传的文件 URL
同步上传接口的实现代码如下,把超时的选择留给客户端
1 2 3 4 5 6 7
//这里的 SyncUploadRequest 和 SyncUploadResponse 类,与之前定义的UploadRequest 和 UploadResponse 是一致的。对于接口的入参和出参 DTO 的命名,我比较建议的方式是,使用接口名 +Request 和 Response 后缀 public SyncUploadResponse syncUpload(SyncUploadRequest request) { SyncUploadResponse response = new SyncUploadResponse(); response.setDownloadUrl(uploadFile(request.getFile())); response.setThumbnailDownloadUrl(uploadThumbnailFile(request.getFile())); return response; }
异步的上传文件接口如何实现。异步上传接口在出参上有点区别,不再返回文件 URL,而是返回一个任务 ID;在接口实现上,我们同样把上传任务提交到线程池处理,但是并不会同步等待任务完成,而是完成后把结果写入一个 HashMap,任务查询接口通过查询这个 HashMap 来获得文件的 URL
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
//计数器,作为上传任务的ID private AtomicInteger atomicInteger = new AtomicInteger(0); //暂存上传操作的结果,生产代码需要考虑数据持久化 private ConcurrentHashMap<String, SyncQueryUploadTaskResponse> downloadUrl = new ConcurrentHashMap<>(); //异步上传操作 public AsyncUploadResponse asyncUpload(AsyncUploadRequest request) { AsyncUploadResponse response = new AsyncUploadResponse(); //生成唯一的上传任务ID String taskId = "upload" + atomicInteger.incrementAndGet(); //异步上传操作只返回任务ID response.setTaskId(taskId); //提交上传原始文件操作到线程池异步处理 threadPool.execute(() -> { String url = uploadFile(request.getFile()); //如果ConcurrentHashMap不包含Key,则初始化一个SyncQueryUploadTaskResponse downloadUrl.computeIfAbsent(taskId, id -> new SyncQueryUploadTaskResponse(id)).setDownloadUrl(url); }); //提交上传缩略图操作到线程池异步处理 threadPool.execute(() -> { String url = uploadThumbnailFile(request.getFile()); downloadUrl.computeIfAbsent(taskId, id -> new SyncQueryUploadTaskResponse(id)).setThumbnailDownloadUrl(url); }); return response; } @Data public class AsyncUploadRequest{ private byte[] file; } @Data public class AsyncUploadResponse{ private String taskId; }
文件上传查询接口则以任务 ID 作为入参,返回两个文件的下载地址,因为文件上传查询接口是同步的,所以直接命名为 syncQueryUploadTask
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
//syncQueryUploadTask接口入参 @Data @RequiredArgsConstructor public class SyncQueryUploadTaskRequest{ private final String taskId;//使用上传文件任务ID查询上传结果 } //syncQueryUploadTask接口出参 @Data @RequiredArgsConstructor public class SyncQueryUploadTaskResponse{ private final String taskId; //任务ID private String downloadUrl; //原始文件下载URL private String thumbnailDownloadUrl; //缩略图下载URL }
1 2 3 4 5 6 7 8
public SyncQueryUploadTaskResponse syncQueryUploadTask(SyncQueryUploadTaskRequest request) { SyncQueryUploadTaskResponse response = new SyncQueryUploadTaskResponse(request.getTaskId()); //从之前定义的downloadUrl ConcurrentHashMap查询结果 response.setDownloadUrl(downloadUrl.getOrDefault(request.getTaskId(), response).getDownloadUrl()); response.setThumbnailDownloadUrl(downloadUrl.getOrDefault(request.getTaskId(), response).getThumbnailDownloadUrl()); return response; }
经过改造的 FileService 不再提供一个看起来是同步上传,内部却是异步上传的 upload 方法,改为提供很明确的:
- 同步上传接口 syncUpload
- 异步上传接口 asyncUpload,搭配 syncQueryUploadTask 查询上传结果
- 使用方可以根据业务性质选择合适的方法:如果是后端批处理使用,那么可以使用同步上传,多等待一些时间问题不大;如果是面向用户的接口,那么接口响应时间不宜过长,可以调用异步上传接口,然后定时轮询上传结果,拿到结果再显示
22.4:防止接口重复提交
简介:在间隔很短的时间周期内对同一个请求URL发起请求,导致前端开发者在很短的时间周期内将同一份数据(请求体)提交到后端相同的接口 多次,最终数据库出现多条主键ID不一样而其他业务数据几乎一毛一样的记录;
其过程可以归为“多线程并发导致并发安全”的问题范畴;
如果重复发起的请求足够多、请求体容量足够大,很可能会给系统接口带来极大的压力,导致其出现“接口不稳定”、“DB负载过高”,严重点甚至可能会出现“系统宕机”的情况
在实际项目开发中,“防止接口重复提交”的实现方式有两类,一类是纯粹的针对请求链接URL的,即防止对同一个URL发起多次请求:此种方式明显粒度过大,容易误伤友军;另一类是针对请求链接URL + 请求体 的,这种方式可以说是比较人性化而且也是比较合理的
以“用户在前端提交注册信息”为例,进行模拟
如果当前提交的请求URL已经存在于缓存中,且 当前提交的请求体 跟 缓存中该URL对应的请求体一毛一样 且 当前请求URL的时间戳跟上次相同请求URL的时间戳 间隔在8s 内,即代表当前请求属于 “重复提交”;如果这其中有一个条件不成立,则意味着当前请求很有可能是第一次请求,或者已经过了8s时间间隔的 第N次请求了,不属于“重复提交”了。
采用的技术为:Spring Boot2.0 + 自定义注解 + 拦截器 + 本地缓存(也可以分布式缓存);
1:首先,定义一个注解
1
2
3
4
5
6
@Inherited
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RepeatSubmit {
}
2:定义一个控制器,处理前端请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@RestController
@RequestMapping("submit")
public class SubmitController extends BaseController{
//用户注册
@RepeatSubmit
@PostMapping("register")
public BaseResponse register(@RequestBody RegisterDto dto) throws Exception{
BaseResponse response=new BaseResponse(StatusCode.Success);
//log.info("用户注册,提交上来的请求信息为:{}",dto);
//将用户信息插入到db
response.setData(dto);
return response;
}
}
////////////////////////////
@Data
public class RegisterDto implements Serializable{
private String userName;
private String nickName;
private Integer age;
}
3:定义一个拦截器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
public abstract class RepeatSubmitInterceptor extends HandlerInterceptorAdapter{
//开始拦截
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
if (handler instanceof HandlerMethod){
HandlerMethod handlerMethod= (HandlerMethod) handler;
Method method=handlerMethod.getMethod();
RepeatSubmit submitAnnotation=method.getAnnotation(RepeatSubmit.class);
if (submitAnnotation!=null){
//如果是重复提交,则进行拦截,拒绝请求
if (this.isRepeatSubmit(request)){
BaseResponse subResponse=new BaseResponse(StatusCode.CanNotRepeatSubmit);
CommonUtil.renderString(response,new Gson().toJson(subResponse));
return false;
}
}
return true;
}else{
return super.preHandle(request, response, handler);
}
}
//自定义方法逻辑-判定是否重复提交
// “判断是否重复提交”可以有多种实现方式,而每种实现方式可以通过继承该抽象类 并 实现该抽象方法 从而将其区分开来,某种程度降低了耦合性(面向接口/抽象类编程)
public abstract boolean isRepeatSubmit(HttpServletRequest request);
}
4:判断重复提交实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
@Component
public class SameUrlDataRepeatInterceptor extends RepeatSubmitInterceptor{
private static final String REPEAT_PARAMS = "RepeatParams";
private static final String REPEAT_TIME = "RepeatTime";
//防重提交key
public static final String REPEAT_SUBMIT_KEY = "Repeat_Submit:";
private static final int IntervalTime = 8;
//构建本地缓存,有效时间为8秒钟
private final Cache<String,String> cache= CacheBuilder.newBuilder().expireAfterWrite(IntervalTime, TimeUnit.SECONDS).build();
/**
* 判断是否重复提交,整体的思路:
* 获取当前请求的URL作为键Key,暂且标记为:A1,其取值为映射Map(Map里面的元素由:请求的链接url 和 请求体的数据组成) 暂且标记为V1;
* 从缓存中(本地缓存或者分布式缓存)查找Key=A1的值V2,如果V2和V1的值一样,即代表当前请求是重复提交的,拒绝执行后续的请求,否则可以继续往后面执行
* 其中,设定重复提交的请求的间隔有效时间为8秒
*
* 注意点:如果在有效时间内,如8秒内,一直发起同个请求url、同个请求体,那么重复提交的有效时间将会自动延长
*/
//真正实现“是否重复提交的逻辑”
@Override
public boolean isRepeatSubmit(HttpServletRequest request) {
String currParams=HttpHelper.getBodyString(request);
if (StringUtils.isBlank(currParams)){
currParams=new Gson().toJson(request.getParameterMap());
}
//获取请求地址,充当A1
String url=request.getRequestURI();
//充当B1
RepeatSubmitCacheDto currCacheData=new RepeatSubmitCacheDto(currParams,System.currentTimeMillis(),url);
//充当键A1
String cacheRepeatKey=REPEAT_SUBMIT_KEY+url;
String cacheValue=cache.getIfPresent(cacheRepeatKey);
//从缓存中查找A1对应的值,如果存在,说明当前请求不是第一次了.
if (StringUtils.isNotBlank(cacheValue)){
//充当B2
RepeatSubmitCacheDto preCacheData=new Gson().fromJson(cacheValue,RepeatSubmitCacheDto.class);
if (this.compareParams(currCacheData,preCacheData) && this.compareTime(currCacheData,preCacheData)){
return true;
}
}
//否则,就是第一次请求
Map<String, Object> cacheMap = new HashMap<>();
cacheMap.put(url, currCacheData);
cache.put(cacheRepeatKey,new Gson().toJson(currCacheData));
return false;
}
//比较参数
private boolean compareParams(RepeatSubmitCacheDto currCacheData, RepeatSubmitCacheDto preCacheData){
Boolean res=currCacheData.getRequestData().equals(preCacheData.getRequestData());
return res;
}
//判断两次间隔时间
private boolean compareTime(RepeatSubmitCacheDto currCacheData, RepeatSubmitCacheDto preCacheData){
Boolean res=( (currCacheData.getCurrTime() - preCacheData.getCurrTime()) < (IntervalTime * 1000) );
return res;
}
}
5:全局配置,使拦截器生效
1
2
3
4
5
6
7
8
9
10
@Component
public class CustomWebConfig implements WebMvcConfigurer{
@Autowired
private RepeatSubmitInterceptor submitInterceptor;
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(submitInterceptor);
}
}
6:进行测试验证
23:缓存设计
- 通常我们会使用更快的介质(比如内存)作为缓存,来解决较慢介质(比如磁盘)读取数据慢的问题,缓存是用空间换时间,来解决性能问题的一种架构设计模式。更重要的是,磁盘上存储的往往是原始数据,而缓存中保存的可以是面向呈现的数据。这样一来,缓存不仅仅是加快了 IO,还可以减少原始数据的计算工作。
- 使用 Redis 做缓存虽然简单好用,但使用和设计缓存并不是 set 一下这么简单,需要注意缓存的同步、雪崩、并发、穿透等问题
- 第一,我们不能把诸如 Redis 的缓存数据库完全当作数据库来使用。我们不能假设缓存始终可靠,也不能假设没有过期的数据必然可以被读取到,需要处理好缓存的回源逻辑;而且要显式设置 Redis 的最大内存使用和数据淘汰策略,避免出现 OOM 的问题
- 第二,缓存的性能比数据库好很多,我们需要考虑大量请求绕过缓存直击数据库造成数据库瘫痪的各种情况。对于缓存瞬时大面积失效的缓存雪崩问题,可以通过差异化缓存过期时间解决;对于高并发的缓存 Key 回源问题,可以使用锁来限制回源并发数;对于不存在的数据穿透缓存的问题,可以通过布隆过滤器进行数据存在性的预判,或在缓存中也设置一个值来解决。
- 第三,当数据库中的数据有更新的时候,需要考虑如何确保缓存中数据的一致性。我们看到,“先更新数据库再删除缓存,访问的时候按需加载数据到缓存”的策略是最为妥当的,并且要尽量设置合适的缓存过期时间,这样即便真的发生不一致,也可以在缓存过期后数据得到及时同步
- 在使用缓存系统的时候,要监控缓存系统的内存使用量、命中率、对象平均过期时间等重要指标,以便评估系统的有效性,并及时发现问题
23.1:不要把Redis当作数据库
因为 Redis 中数据消失导致业务逻辑错误,并且因为没有保留原始数据,业务都无法恢复
Redis 的确具有数据持久化功能,可以实现服务重启后数据不丢失。这一点,很容易让我们误认为 Redis 可以作为高性能的 KV 数据库
其实,从本质上来看,Redis(免费版)是一个内存数据库,所有数据保存在内存中,并且直接从内存读写数据响应操作,只不过具有数据持久化能力。所以,Redis 的特点是,处理请求很快,但无法保存超过内存大小的数据
VM 模式虽然可以保存超过内存大小的数据,但是因为性能原因从 2.6 开始已经被废弃。此外,Redis 企业版提供了 Redis on Flash 可以实现 Key+ 字典 + 热数据保存在内存中,冷数据保存在 SSD 中。
因此,把 Redis 用作缓存,我们需要注意两点:
- 第一,从客户端的角度来说,缓存数据的特点一定是有原始数据来源,且允许丢失,即使设置的缓存时间是 1 分钟,在 30 秒时缓存数据因为某种原因消失了,我们也要能接受。当数据丢失后,我们需要从原始数据重新加载数据,不能认为缓存系统是绝对可靠的,更不能认为缓存系统不会删除没有过期的数据
- 第二,从 Redis 服务端的角度来说,缓存系统可以保存的数据量一定是小于原始数据的。首先,我们应该限制 Redis 对内存的使用量,也就是设置 maxmemory 参数;其次,我们应该根据数据特点,明确 Redis 应该以怎样的算法来驱逐数据
常用的数据淘汰策略有:
- allkeys-lru,针对所有 Key,优先删除最近最少使用的 Key
- volatile-lru,针对带有过期时间的 Key,优先删除最近最少使用的 Key
- volatile-ttl,针对带有过期时间的 Key,优先删除即将过期的 Key(根据 TTL 的值)
- allkeys-lfu(Redis 4.0 以上),针对所有 Key,优先删除最少使用的 Key
- volatile-lfu(Redis 4.0 以上),针对带有过期时间的 Key,优先删除最少使用的 Key
其实,这些算法是 Key 范围 +Key 选择算法的搭配组合,其中范围有 allkeys 和 volatile 两种,算法有 LRU、TTL 和 LFU 三种
首先,从算法角度来说,Redis 4.0 以后推出的 LFU 比 LRU 更“实用”。试想一下,如果一个 Key 访问频率是 1 天一次,但正好在 1 秒前刚访问过,那么 LRU 可能不会选择优先淘汰这个 Key,反而可能会淘汰一个 5 秒访问一次但最近 2 秒没有访问过的 Key,而 LFU
算法不会有这个问题。而 TTL 会比较“头脑简单”一点,优先删除即将过期的 Key,但有可能这个 Key 正在被大量访问
然后,从 Key 范围角度来说,allkeys 可以确保即使 Key 没有 TTL 也能回收,如果使用的时候客户端总是“忘记”设置缓存的过期时间,那么可以考虑使用这个系列的算法。而 volatile 会更稳妥一些,万一客户端把 Redis 当做了长效缓存使用,只是启动时候初始化一次缓存,那么一旦删除了此类没有 TTL 的数据,可能就会导致客户端出错
所以,不管是使用者还是管理者都要考虑 Redis 的使用方式,使用者需要考虑应该以缓存的姿势来使用 Redis,管理者应该为 Redis 设置内存限制和合适的驱逐策略,避免出现 OOM
23.2:注意缓存雪崩问题
由于缓存系统的 IOPS 比数据库高很多,因此要特别小心短时间内大量缓存失效的情况。这种情况一旦发生,可能就会在瞬间有大量的数据需要回源到数据库查询,对数据库造成极大的压力,极限情况下甚至导致后端数据库直接崩溃。这就是我们常说的缓存失效,也叫作缓存雪崩
从广义上说,产生缓存雪崩的原因有两种:
- 第一种是,缓存系统本身不可用,导致大量请求直接回源到数据库
- 第二种是,应用设计层面大量的 Key 在同一时间过期,导致大量的数据回源
程序初始化的时候放入 1000 条城市数据到 Redis 缓存中,过期时间是 30 秒;数据过期后从数据库获取数据然后写入缓存,每次从数据库获取数据后计数器 +1;在程序启动的同时,启动一个定时任务线程每隔一秒输出计数器的值,并把计数器归零;压测一个随机查询某城市信息的接口,观察一下数据库的 QPS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
@Slf4j @RequestMapping("cacheinvalid") @RestController public class CacheInvalidController { @Autowired private StringRedisTemplate stringRedisTemplate; private AtomicInteger atomicInteger = new AtomicInteger(); //@PostConstruct public void wrongInit() { //初始化1000个城市数据到Redis,所有缓存数据有效期30秒 IntStream.rangeClosed(1, 1000).forEach(i -> stringRedisTemplate.opsForValue().set("city" + i, getCityFromDb(i), 30, TimeUnit.SECONDS)); log.info("Cache init finished"); //每秒一次,输出数据库访问的QPS //启动程序 30 秒后缓存过期,回源的数据库 QPS 最高达到了 700 多 Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { log.info("DB QPS : {}", atomicInteger.getAndSet(0)); }, 0, 1, TimeUnit.SECONDS); } /* 方案一,差异化缓存过期时间,不要让大量的 Key 在同一时间过期。比如,在初始化缓存的时候,设置缓存的过期时间是 30 秒 +10 秒以内的随机延迟(扰动值)。这样,这些Key 不会集中在 30 秒这个时刻过期,而是会分散在 30~40 秒之间过期 修改后,缓存过期时的回源不会集中在同一秒,数据库的 QPS 从 700 多降到了最高 100左右 */ //@PostConstruct public void rightInit1() { //这次缓存的过期时间是30秒+10秒内的随机延迟 IntStream.rangeClosed(1, 1000).forEach(i -> stringRedisTemplate.opsForValue().set("city" + i, getCityFromDb(i), 30 + ThreadLocalRandom.current().nextInt(10), TimeUnit.SECONDS)); log.info("Cache init finished"); //同样1秒一次输出数据库QPS: Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { log.info("DB QPS : {}", atomicInteger.getAndSet(0)); }, 0, 1, TimeUnit.SECONDS); } /* 方案二,让缓存不主动过期。初始化缓存数据的时候设置缓存永不过期,然后启动一个后台线程 30 秒一次定时把所有数据更新到缓存,而且通过适当的休眠,控制从数据库更新数据的频率,降低数据库压力 这样修改后,虽然缓存整体更新的耗时在 21 秒左右,但数据库的压力会比较稳定 */ @PostConstruct public void rightInit2() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(1); //每隔30秒全量更新一次缓存 Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { IntStream.rangeClosed(1, 1000).forEach(i -> { String data = getCityFromDb(i); //模拟更新缓存需要一定的时间 try { TimeUnit.MILLISECONDS.sleep(20); } catch (InterruptedException e) { } if (!StringUtils.isEmpty(data)) { //缓存永不过期,被动更新 stringRedisTemplate.opsForValue().set("city" + i, data); } }); log.info("Cache update finished"); //启动程序的时候需要等待首次更新缓存完成 countDownLatch.countDown(); }, 0, 30, TimeUnit.SECONDS); Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { log.info("DB QPS : {}", atomicInteger.getAndSet(0)); }, 0, 1, TimeUnit.SECONDS); countDownLatch.await(); } @GetMapping("city") public String city() { //随机查询一个城市 int id = ThreadLocalRandom.current().nextInt(1000) + 1; String key = "city" + id; String data = stringRedisTemplate.opsForValue().get(key); if (data == null) { //回源到数据库查询 data = getCityFromDb(id); if (!StringUtils.isEmpty(data)) //缓存30秒过期 stringRedisTemplate.opsForValue().set(key, data, 30, TimeUnit.SECONDS); } return data; } //模拟查询数据库,查一次增加计数器加一 private String getCityFromDb(int cityId) { atomicInteger.incrementAndGet(); return "citydata" + System.currentTimeMillis(); } }
关于这两种解决方案,我们需要特别注意以下三点:
- 方案一和方案二是截然不同的两种缓存方式,如果无法全量缓存所有数据,那么只能使用方案一
- 即使使用了方案二,缓存永不过期,同样需要在查询的时候,确保有回源的逻辑。正如之前所说,我们无法确保缓存系统中的数据永不丢失
- 不管是方案一还是方案二,在把数据从数据库加入缓存的时候,都需要判断来自数据库的数据是否合法,比如进行最基本的判空检查
- 缓存会让我们更不容易发现原始数据的问题,所以在把数据加入缓存之前一定要校验数据,如果发现有明显异常要及时报警
23.3:注意缓存击穿问题
在某些 Key 属于极端热点数据,且并发量很大的情况下,如果这个 Key 过期,可能会在某个瞬间出现大量的并发请求同时回源,相当于大量的并发请求直接打到了数据库。这种情况,就是我们常说的缓存击穿或缓存并发问题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
@Slf4j @RequestMapping("cacheconcurrent") @RestController public class CacheConcurrentController { @Autowired private StringRedisTemplate stringRedisTemplate; private AtomicInteger atomicInteger = new AtomicInteger(); @Autowired private RedissonClient redissonClient; /* 在程序启动的时候,初始化一个热点数据到 Redis 中,过期时间设置为 5 秒,每隔 1 秒输出一下回源的 QPS */ @PostConstruct public void init() { //初始化一个热点数据到Redis中,过期时间设置为5秒 stringRedisTemplate.opsForValue().set("hotsopt", getExpensiveData(), 5, TimeUnit.SECONDS); //每隔1秒输出一下回源的QPS Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { log.info("DB QPS : {}", atomicInteger.getAndSet(0)); }, 0, 1, TimeUnit.SECONDS); } /* 可以看到,每隔 5 秒数据库都有 20 左右的 QPS */ @GetMapping("wrong") public String wrong() { String data = stringRedisTemplate.opsForValue().get("hotsopt"); if (StringUtils.isEmpty(data)) { data = getExpensiveData(); //重新加入缓存,过期时间还是5秒 stringRedisTemplate.opsForValue().set("hotsopt", data, 5, TimeUnit.SECONDS); } return data; } /* 如果回源操作特别昂贵,那么这种并发就不能忽略不计。这时,我们可以考虑使用锁机制来限制回源的并发。比如如下代码示例,使用 Redisson 来获取一个基于 Redis 的分布式锁,在查询数据库之前先尝试获取锁 这样,可以把回源到数据库的并发限制在 1 */ @GetMapping("right") public String right() { String data = stringRedisTemplate.opsForValue().get("hotsopt"); if (StringUtils.isEmpty(data)) { RLock locker = redissonClient.getLock("locker"); //获取分布式锁 if (locker.tryLock()) { try { data = stringRedisTemplate.opsForValue().get("hotsopt"); //双重检查,因为可能已经有一个B线程过了第一次判断,在等锁,然后A线程已经把数据写入了Redis中 if (StringUtils.isEmpty(data)) { //回源到数据库查询 data = getExpensiveData(); stringRedisTemplate.opsForValue().set("hotsopt", data, 5, TimeUnit.SECONDS); } } finally { //别忘记释放,另外注意写法,获取锁后整段代码try+finally,确保unlock万无一失 locker.unlock(); } } } return data; } private String getExpensiveData() { atomicInteger.incrementAndGet(); return "important data"; } }
在真实的业务场景下,不一定要这么严格地使用双重检查分布式锁进行全局的并发限制,因为这样虽然可以把数据库回源并发降到最低,但也限制了缓存失效时的并发。可以考虑的方式是:
- 方案一,使用进程内的锁进行限制,这样每一个节点都可以以一个并发回源数据库
- 方案二,不使用锁进行限制,而是使用类似 Semaphore 的工具限制并发数,比如限制为 10,这样既限制了回源并发数不至于太大,又能使得一定量的线程可以同时回源
23.4:注意缓存穿透问题
缓存回源的逻辑都是当缓存中查不到需要的数据时,回源到数据库查询。这里容易出现的一个漏洞是,缓存中没有数据不一定代表数据没有缓存,还有一种可能是原始数据压根就不存在。
这里需要注意,缓存穿透和缓存击穿的区别:
- 缓存穿透是指,缓存没有起到压力缓冲的作用
- 而缓存击穿是指,缓存失效时瞬时的并发打到数据库
解决缓存穿透有以下两种方案:
- 方案一,对于不存在的数据,同样设置一个特殊的 Value 到缓存中,比如当数据库中查出的用户信息为空的时候,设置 NODATA 这样具有特殊含义的字符串到缓存中。这样下次请求缓存的时候还是可以命中缓存,即直接从缓存返回结果,不查询数据库,对应right方法
- 使用布隆过滤器做前置过滤
布隆过滤器是一种概率型数据库结构,由一个很长的二进制向量和一系列随机映射函数组成。它的原理是,当一个元素被加入集合时,通过 k 个散列函数将这个元素映射成一个 m位 bit 数组中的 k 个点,并置为 1。检索时,我们只要看看这些点是不是都是 1 就(大概)知道集合中有没有它了。如果这些点有任何一个 0,则被检元素一定不在;如果都是 1,则被检元素很可能在。
布隆过滤器不保存原始值,空间效率很高,平均每一个元素占用 2.4 字节就可以达到万分之一的误判率。这里的误判率是指,过滤器判断值存在而实际并不存在的概率。我们可以设置布隆过滤器使用更大的存储空间,来得到更小的误判率。
你可以把所有可能的值保存在布隆过滤器中,从缓存读取数据前先过滤一次:
- 如果布隆过滤器认为值不存在,那么值一定是不存在的,无需查询缓存也无需查询数据库
- 对于极小概率的误判请求,才会最终让非法 Key 的请求走到缓存或数据库
其实,方案二可以和方案一同时使用,即将布隆过滤器前置,对于误判的情况再保存特殊值到缓存,双重保险避免无效数据查询请求打到数据库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
@Slf4j @RequestMapping("cachepenetration") @RestController public class CachePenetrationController { @Autowired private StringRedisTemplate stringRedisTemplate; private AtomicInteger atomicInteger = new AtomicInteger(); private BloomFilter<Integer> bloomFilter; @PostConstruct public void init() { Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> { log.info("DB QPS : {}", atomicInteger.getAndSet(0)); }, 0, 1, TimeUnit.SECONDS); //创建布隆过滤器,元素数量10000,期望误判率1% bloomFilter = BloomFilter.create(Funnels.integerFunnel(), 10000, 0.01); //填充布隆过滤器 IntStream.rangeClosed(1, 10000).forEach(bloomFilter::put); } /* 数据库中只保存有 ID 介于 0(不含)和 10000(包含)之间的用户,如果从数据库查询 ID 不在这个区间的用户,会得到空字符串,所以缓存中缓存的也是空字符串。如果使用 ID=0 去压接口的话,从缓存中查出了空字符串,认为是缓存中没有数据回源查询,其实相当于每次都回源 压测后数据库的 QPS 达到了几千 如果这种漏洞被恶意利用的话,就会对数据库造成很大的性能压力。这就是缓存穿透 */ @GetMapping("wrong") public String wrong(@RequestParam("id") int id) { String key = "user" + id; String data = stringRedisTemplate.opsForValue().get(key); //无法区分是无效用户还是缓存失效 if (StringUtils.isEmpty(data)) { data = getCityFromDb(id); stringRedisTemplate.opsForValue().set(key, data, 30, TimeUnit.SECONDS); } return data; } /* 但,这种方式可能会把大量无效的数据加入缓存中,如果担心大量无效数据占满缓存的话还可以考虑方案二,即使用布隆过滤器做前置过滤 */ @GetMapping("right") public String right(@RequestParam("id") int id) { String key = "user" + id; String data = stringRedisTemplate.opsForValue().get(key); if (StringUtils.isEmpty(data)) { data = getCityFromDb(id); //校验从数据库返回的数据是否有效 if (!StringUtils.isEmpty(data)) { stringRedisTemplate.opsForValue().set(key, data, 30, TimeUnit.SECONDS); } else { //如果无效,直接在缓存中设置一个NODATA,这样下次查询时即使是无效用户还是可以命中缓存 stringRedisTemplate.opsForValue().set(key, "NODATA", 30, TimeUnit.SECONDS); } } return data; } /* 要用上布隆过滤器,我们可以使用 Google 的 Guava 工具包提供的 BloomFilter 类改造一下程序:启动时,初始化一个具有所有有效用户 ID 的、10000 个元素的 BloomFilter,在从缓存查询数据之前调用其 mightContain 方法,来检测用户 ID 是否可能存在;如果布隆过滤器说值不存在,那么一定是不存在的,直接返回 对于方案二,我们需要同步所有可能存在的值并加入布隆过滤器,这是比较麻烦的地方。如果业务规则明确的话,你也可以考虑直接根据业务规则判断值是否存在 */ @GetMapping("right2") public String right2(@RequestParam("id") int id) { String data = ""; //通过布隆过滤器先判断 if (bloomFilter.mightContain(id)) { String key = "user" + id; //走缓存查询 data = stringRedisTemplate.opsForValue().get(key); if (StringUtils.isEmpty(data)) { //走数据库查询 data = getCityFromDb(id); stringRedisTemplate.opsForValue().set(key, data, 30, TimeUnit.SECONDS); } } return data; } private String getCityFromDb(int id) { atomicInteger.incrementAndGet(); //注意,只有ID介于0(不含)和10000(包含)之间的用户才是有效用户,可以查询到用户信息 if (id > 0 && id <= 10000) return "userdata"; //否则返回空字符串 return ""; } }
23.5:注意缓存数据同步策略
- 前面提到的 3 个案例,其实都属于缓存数据过期后的被动删除。在实际情况下,修改了原始数据后,考虑到缓存数据更新的及时性,我们可能会采用主动更新缓存的策略。这些策略可能是:
- 先更新缓存,再更新数据库;
- 先更新数据库,再更新缓存;
- 先删除缓存,再更新数据库,访问的时候按需加载数据到缓存;
- 先更新数据库,再删除缓存,访问的时候按需加载数据到缓存。
- 先更新缓存再更新数据库”策略不可行。数据库设计复杂,压力集中,数据库因为超时等原因更新操作失败的可能性较大,此外还会涉及事务,很可能因为数据库更新失败,导致缓存和数据库的数据不一致。
- 先更新数据库再更新缓存”策略不可行。一是,如果线程 A 和 B 先后完成数据库更新,但更新缓存时却是 B 和 A 的顺序,那很可能会把旧数据更新到缓存中引起数据不一致;二是,我们不确定缓存中的数据是否会被访问,不一定要把所有数据都更新到缓存中去
- 先删除缓存再更新数据库,访问的时候按需加载数据到缓存”策略也不可行。在并发的情况下,很可能删除缓存后还没来得及更新数据库,就有另一个线程先读取了旧值到缓存中,如果并发量很大的话这个概率也会很大
- 先更新数据库再删除缓存,访问的时候按需加载数据到缓存”策略是最好的。虽然在极端情况下,这种策略也可能出现数据不一致的问题,但概率非常低,基本可以忽略。举一个“极端情况”的例子,比如更新数据的时间节点恰好是缓存失效的瞬间,这时 A 先读取到了旧值,随后在 B 操作数据库完成更新并且删除了缓存之后,A 再把旧值加入缓存
- 需要注意的是,更新数据库后删除缓存的操作可能失败,如果失败则考虑把任务加入延迟队列进行延迟重试,确保数据可以删除,缓存可以及时更新。因为删除操作是幂等的,所以即使重复删问题也不是太大,这又是删除比更新好的一个原因。
- 因此,针对缓存更新更推荐的方式是,缓存中的数据不由数据更新操作主动触发,统一在需要使用的时候按需加载,数据更新后及时删除缓存中的数据即可。
23.6:思考与讨论
- 热点 Key 回源会对数据库产生的压力问题,如果 Key特别热的话,可能缓存系统也无法承受,毕竟所有的访问都集中打到了一台缓存服务器。如果我们使用 Redis 来做缓存,那可以把一个热点 Key 的缓存查询压力,分散到多个 Redis 节点上吗?
- 可以给hotkey加上后缀,让这些hotkey打散到不同的redis实例上。分型一个场景:假如在一个非常热点的数据,数据更新不是很频繁,但是查询非常的频繁,要保证基本保证100%的缓存命中率,该怎么处理?我们的做法是,空间换效率,同一个key保留2份,1个不带后缀,1个带后缀,不带的后缀的有ttl,带后缀的没有,先查询不带后缀的,查询不到,做两件事情:1、后台程序查询DB更新缓存;2查询带后缀返回给调用方。这样可以尽可能的避免缓存击穿而引起的数据库挂了。
- 大 Key 也是数据缓存容易出现的一个问题。如果一个 Key 的 Value 特别大,那么可能会对 Redis 产生巨大的性能影响,因为 Redis 是单线程模型,对大 Key 进行查询或删除等操作,可能会引起 Redis 阻塞甚至是高可用切换。你知道怎么查询 Redis 中的大Key,以及如何在设计上实现大 Key 的拆分吗?
- 1:单个key存储的value很大;key分为2种类型: 第一:该key需要每次都整存整取可以尝试将对象分拆成几个key-value, 使用multiGet获取值,这样分拆的意义在于分拆单次操作的压力,将操作压力平摊到多个redis实例中,降低对单个redis的IO影响; 第二:该对象每次只需要存取部分数据;可以像第一种做法一样,分拆成几个key-value, 也可以将这个存储在一个hash中,每个field代表一个具体的属性,使用hget,hmget来获取部分的value,使用hset,hmset来更新部分属性。
- 2、一个集群存储了上亿的key 如果key的个数过多会带来更多的内存空间占用,第一:key本身的占用(每个key 都会有一个Category前缀);第二:集群模式中,服务端需要建立一些slot2key的映射关系,这其中的指针占用在key多的情况下也是浪费巨大空间;这两个方面在key个数上亿的时候消耗内存十分明显(Redis 3.2及以下版本均存在这个问题,4.0有优化);所以减少key的个数可以减少内存消耗,可以参考的方案是转Hash结构存储,即原先是直接使用Redis String 的结构存储,现在将多个key存储在一个Hash结构中,具体场景参考如下: 一: key 本身就有很强的相关性,比如多个key 代表一个对象,每个key是对象的一个属性,这种可直接按照特定对象的特征来设置一个新Key——Hash结构, 原先的key则作为这个新Hash 的field。 二: key 本身没有相关性,预估一下总量,预分一个固定的桶数量;比如现在预估key 的总数为 2亿,按照一个hash存储 100个field来算,需要 2亿 / 100 = 200W 个桶 (200W 个key占用的空间很少,2亿可能有将近 20G )现在按照200W 固定桶分就是先计算出桶的序号 hash(123456789) % 200W , 这里最好保证这个 hash算法的值是个正数,否则需要调整下模除的规则;这样算出三个key 的桶分别是 1 , 2, 2。 所以存储的时候调用API hset(key, field, value),读取的时候使用 hget (key, field)注意两个地方:1,hash 取模对负数的处理; 2,预分桶的时候, 一个hash 中存储的值最好不要超过 512 ,100 左右较为合适
24:开发转生产
- 所谓生产就绪(Production-ready),是指应用开发完成要投入生产环境,开发层面需要额外做的一些工作。在我看来,如果应用只是开发完成了功能代码,然后就直接投产,那意味着应用其实在裸奔。在这种情况下,遇到问题因为缺乏有效的监控导致无法排查定位问题,同时很可能遇到问题我们自己都不知道,需要依靠用户反馈才知道应用出了问题。
- 生产就绪需要做哪些工作呢?
- 第一,提供健康检测接口。传统采用 ping 的方式对应用进行探活检测并不准确。有的时候,应用的关键内部或外部依赖已经离线,导致其根本无法正常工作,但其对外的 Web 端口或管理端口是可以 ping 通的。我们应该提供一个专有的监控检测接口,并尽可能触达一些内部组件。比如可以提供一个返回success的controller接口。
- 第二,暴露应用内部信息。应用内部诸如线程池、内存队列等组件,往往在应用内部扮演了重要的角色,如果应用或应用框架可以对外暴露这些重要信息,并加以监控,那么就有可能在诸如 OOM 等重大问题暴露之前发现蛛丝马迹,避免出现更大的问题。
- 第三,建立应用指标 Metrics 监控。Metrics 可以翻译为度量或者指标,指的是对于一些关键信息以可聚合的、数值的形式做定期统计,并绘制出各种趋势图表。这里的指标监控,包括两个方面:一是,应用内部重要组件的指标监控,比如 JVM 的一些指标、接口的 QPS等;二是,应用的业务数据的监控,比如电商订单量、游戏在线人数等。
24.1:配置 Spring Boot Actuator
Spring Boot 有一个 Actuator 模块,封装了诸如健康检测、应用内部信息、Metrics 指标等生产就绪的功能。
pom
1 2 3 4
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
注意一些重要的配置:
如果你不希望 Web 应用的 Actuator 管理端口和应用端口重合的话,可以使用management.server.port 设置独立的端口。
Actuator 自带了很多开箱即用提供信息的端点(Endpoint),可以通过 JMX 或 Web两种方式进行暴露。考虑到有些信息比较敏感,这些内置的端点默认不是完全开启的,你可以通过官网查看这些默认值。在这里,为了方便后续 Demo,我们设置所有端点通过 Web 方式开启。
默认情况下,Actuator 的 Web 访问方式的根地址为 /actuator,可以通过management.endpoints.web.base-path 参数进行修改。我来演示下,如何将其修改为 /admin。
1 2 3 4
management.server.port=45679 management.endpoints.web.exposure.include=* management.endpoints.web.base-path=/admin- 现在,你就可以访问 http://localhost:45679/admin,来查看 Actuator 的所有功能URL 了
- 其中,大部分端点提供的是只读信息,比如查询 Spring 的 Bean、ConfigurableEnvironment、定时任务、SpringBoot 自动配置、Spring MVC 映射等;少部分端点还提供了修改功能,比如优雅关闭程序、下载线程 Dump、下载堆 Dump、修改日志级别等
24.2:健康检查要触达关键组件
健康检测接口可以让监控系统或发布工具知晓应用的真实健康状态,比 ping 应用端口更可靠。不过,要达到这种效果最关键的是,我们能确保健康检测接口可以探查到关键组件的状态。
Spring Boot Actuator 帮我们预先实现了诸如数据库、InfluxDB、Elasticsearch、Redis、RabbitMQ 等三方系统的健康检测指示器 HealthIndicator
通过 Spring Boot 的自动配置,这些指示器会自动生效。当这些组件有问题的时候,HealthIndicator 会返回 DOWN 或 OUT_OF_SERVICE 状态,health 端点 HTTP 响应状态码也会变为 503,我们可以以此来配置程序健康状态监控报警。
我们可以修改配置文件,把 management.endpoint.health.show-details 参数设置为 always,让所有用户都可以直接查看各个组件的健康情况(如果配置为 when-authorized,那么可以结合 management.endpoint.health.roles 配置授权的角色)
访问 health 端点可以看到,数据库、磁盘、RabbitMQ、Redis 等组件健康状态是 UP,整个应用的状态也是 UP
我们考虑一下,如果程序依赖一个很重要的三方服务,我们希望这个服务无法访问的时候,应用本身的健康状态也是 DOWN。比如三方服务有一个 user 接口,出现异常的概率是 50%
1 2 3 4 5 6 7 8 9 10 11 12 13 14
@Slf4j @RestController @RequestMapping("user") public class UserServiceController { @GetMapping public User getUser(@RequestParam("userId") long id) { //一半概率返回正确响应,一半概率抛异常 if (ThreadLocalRandom.current().nextInt() % 2 == 0) return new User(id, "name" + id); else throw new RuntimeException("error"); } }
要实现这个 user 接口是否正确响应和程序整体的健康状态挂钩的话,很简单,只需定义一个 UserServiceHealthIndicator 实现 HealthIndicator 接口即可。
在 health 方法中,我们通过 RestTemplate 来访问这个 user 接口,如果结果正确则返回Health.up(),并把调用执行耗时和结果作为补充信息加入 Health 对象中。如果调用接口出现异常,则返回 Health.down(),并把异常信息作为补充信息加入 Health 对象中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
@Component @Slf4j public class UserServiceHealthIndicator implements HealthIndicator { @Autowired private RestTemplate restTemplate; @Override public Health health() { long begin = System.currentTimeMillis(); long userId = 1L; User user = null; try { //访问远程接口 user = restTemplate.getForObject("http://localhost:45678/user?userId=" + userId, User.class); if (user != null && user.getUserId() == userId) { //结果正确,返回UP状态,补充提供耗时和用户信息 return Health.up() .withDetail("user", user) .withDetail("took", System.currentTimeMillis() - begin) .build(); } else { //结果不正确,返回DOWN状态,补充提供耗时 return Health.down().withDetail("took", System.currentTimeMillis() - begin).build(); } } catch (Exception ex) { //出现异常,先记录异常,然后返回DOWN状态,补充提供异常信息和耗时 log.warn("health check failed!", ex); return Health.down(ex).withDetail("took", System.currentTimeMillis() - begin).build(); } } }
我们再来看一个聚合多个 HealthIndicator 的案例,也就是定义一个CompositeHealthContributor 来聚合多个 HealthContributor,实现一组线程池的监控
首先,在 ThreadPoolProvider 中定义两个线程池,其中 demoThreadPool 是包含一个工作线程的线程池,类型是 ArrayBlockingQueue,阻塞队列的长度为 10;还有一个ioThreadPool 模拟 IO 操作线程池,核心线程数 10,最大线程数 50
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
public class ThreadPoolProvider { //一个工作线程的线程池,队列长度10 private static ThreadPoolExecutor demoThreadPool = new ThreadPoolExecutor( 1, 1, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get()); //核心线程数10,最大线程数50的线程池,队列长度50 private static ThreadPoolExecutor ioThreadPool = new ThreadPoolExecutor( 10, 50, 2, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder().setNameFormat("io-threadpool-%d").get()); public static ThreadPoolExecutor getDemoThreadPool() { return demoThreadPool; } public static ThreadPoolExecutor getIOThreadPool() { return ioThreadPool; } }
然后,我们定义一个接口,来把耗时很长的任务提交到这个 demoThreadPool 线程池,以模拟线程池队列满的情况
1 2 3 4 5 6 7 8 9
@GetMapping("slowTask") public void slowTask() { ThreadPoolProvider.getDemoThreadPool().execute(() -> { try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { } }); }
做了这些准备工作后,让我们来真正实现自定义的 HealthIndicator 类,用于单一线程池的健康状态;我们可以传入一个 ThreadPoolExecutor,通过判断队列剩余容量来确定这个组件的健康状态,有剩余量则返回 UP,否则返回 DOWN,并把线程池队列的两个重要数据,也就是当前队列元素个数和剩余量,作为补充信息加入 Health
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
public class ThreadPoolHealthIndicator implements HealthIndicator { private ThreadPoolExecutor threadPool; public ThreadPoolHealthIndicator(ThreadPoolExecutor threadPool) { this.threadPool = threadPool; } @Override public Health health() { //补充信息 Map<String, Integer> detail = new HashMap<>(); //队列当前元素个数 detail.put("queue_size", threadPool.getQueue().size()); //队列剩余容量 detail.put("queue_remaining", threadPool.getQueue().remainingCapacity()); //如果还有剩余量则返回UP,否则返回DOWN if (threadPool.getQueue().remainingCapacity() > 0) { return Health.up().withDetails(detail).build(); } else { return Health.down().withDetails(detail).build(); } } }
再定义一个 CompositeHealthContributor,来聚合两个 ThreadPoolHealthIndicator 的实例,分别对应 ThreadPoolProvider 中定义的两个线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
@Component public class ThreadPoolsHealthContributor implements CompositeHealthContributor { //保存所有的子HealthContributor private Map<String, HealthContributor> contributors = new HashMap<>(); ThreadPoolsHealthContributor() { //对应ThreadPoolProvider中定义的两个线程池 this.contributors.put("demoThreadPool", new ThreadPoolHealthIndicator(ThreadPoolProvider.getDemoThreadPool())); this.contributors.put("ioThreadPool", new ThreadPoolHealthIndicator(ThreadPoolProvider.getIOThreadPool())); } @Override public HealthContributor getContributor(String name) { //根据name找到某一个HealthContributor return contributors.get(name); } @Override public Iterator<NamedContributor<HealthContributor>> iterator() { //返回NamedContributor的迭代器,NamedContributor也就是Contributor实例+一个命名 return contributors.entrySet().stream() .map((entry) -> NamedContributor.of(entry.getKey(), entry.getValue())).iterator(); } }
程序启动后可以看到,health 接口展现了线程池和外部服务 userService 的健康状态,以及一些具体信息;我们看到一个 demoThreadPool 为 DOWN 导致父 threadPools 为 DOWN,进一步导致整个程序的 status 为 DOWN
- 以上,就是通过自定义 HealthContributor 和 CompositeHealthContributor,来实现监控检测触达程序内部诸如三方服务、线程池等关键组件;额外补充一下,Spring Boot 2.3.0增强了健康检测的功能,细化了 Liveness 和Readiness 两个端点,便于 Spring Boot 应用程序和 Kubernetes 整合
24.3:对外暴露应用内部重要组件的状态
除了可以把线程池的状态作为整个应用程序是否健康的依据外,我们还可以通过 Actuator的 InfoContributor 功能,对外暴露程序内部重要组件的状态数据。用一个例子演示使用 info 的 HTTP 端点、JMX MBean 这两种方式,如何查看状态数据。
实现一个 ThreadPoolInfoContributor 来展现线程池的信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
@Component public class ThreadPoolInfoContributor implements InfoContributor { private static Map threadPoolInfo(ThreadPoolExecutor threadPool) { Map<String, Object> info = new HashMap<>(); //当前池大小 info.put("poolSize", threadPool.getPoolSize()); //核心池大小 info.put("corePoolSize", threadPool.getCorePoolSize()); //最大达到过的池大小 info.put("largestPoolSize", threadPool.getLargestPoolSize()); //最大池大小 info.put("maximumPoolSize", threadPool.getMaximumPoolSize()); //总完成任务数 info.put("completedTaskCount", threadPool.getCompletedTaskCount()); return info; } @Override public void contribute(Info.Builder builder) { builder.withDetail("demoThreadPool", threadPoolInfo(ThreadPoolProvider.getDemoThreadPool())); builder.withDetail("ioThreadPool", threadPoolInfo(ThreadPoolProvider.getIOThreadPool())); } }
- 访问 /admin/info 接口
24.4:利用Metrics快速定位问题
指标是指一组和时间关联的、衡量某个维度能力的量化数值。通过收集指标并展现为曲线图、饼图等图表,可以帮助我们快速定位、分析问题
有一个外卖订单的下单和配送流程,如下图所示。OrderController 进行下单操作,下单操作前先判断参数,如果参数正确调用另一个服务查询商户状态,如果商户在营业的话继续下单,下单成功后发一条消息到 RabbitMQ 进行异步配送流程;然后另一个DeliverOrderHandler 监听这条消息进行配送操作。
对于这样一个涉及同步调用和异步调用的业务流程,如果用户反馈下单失败,那我们如何才能快速知道是哪个环节出了问题呢?
这时,指标体系就可以发挥作用了。我们可以分别为下单和配送这两个重要操作,建立一些指标进行监控。对于下单操作,可以建立 4 个指标:
- 下单总数量指标,监控整个系统当前累计的下单量;
- 下单请求指标,对于每次收到下单请求,在处理之前 +1;
- 下单成功指标,每次下单成功完成 +1;
- 下单失败指标,下单操作处理出现异常 +1,并且把异常原因附加到指标上。
对于配送操作,也是建立类似的 4 个指标。我们可以使用 Micrometer 框架实现指标的收集,它也是 Spring Boot Actuator 选用的指标框架。它实现了各种指标的抽象,常用的有三种:
- gauge(红色),它反映的是指标当前的值,是多少就是多少,不能累计,比如本例中的下单总数量指标,又比如游戏的在线人数、JVM 当前线程数都可以认为是 gauge。
- counter(绿色),每次调用一次方法值增加 1,是可以累计的,比如本例中的下单请求指标。举一个例子,如果 5 秒内我们调用了 10 次方法,Micrometer 也是每隔 5 秒把指标发送给后端存储系统一次,那么它可以只发送一次值,其值为 10。
- timer(蓝色),类似 counter,只不过除了记录次数,还记录耗时,比如本例中的下单成功和下单失败两个指标。
所有的指标还可以附加一些 tags 标签,作为补充数据。比如,当操作执行失败的时候,我们就会附加一个 reason 标签到指标上。
Micrometer 除了抽象了指标外,还抽象了存储。你可以把 Micrometer 理解为类似 SLF4J这样的框架,只不过后者针对日志抽象,而 Micrometer 是针对指标进行抽象。Micrometer 通过引入各种 registry,可以实现无缝对接各种监控系统或时间序列数据库。
在这个案例中,我们引入了 micrometer-registry-influx 依赖,目的是引入 Micrometer的核心依赖,以及通过 Micrometer 对于InfluxDB(InfluxDB 是一个时间序列数据库,其专长是存储指标数据)的绑定,以实现指标数据可以保存到 InfluxDB
1 2 3 4
<dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-influx</artifactId> </dependency>
然后,修改配置文件,启用指标输出到 InfluxDB 的开关、配置 InfluxDB 的地址,以及设置指标每秒在客户端聚合一次,然后发送到 InfluxDB
1 2 3 4
management.metrics.export.influx.enabled=true management.metrics.export.influx.uri=http://localhost:8086 management.metrics.export.influx.step=1S
接下来,我们在业务逻辑中增加相关的代码来记录指标。
当用户 ID<10 的时候,我们模拟用户数据无效的情况,当商户 ID 不为 2 的时候我们模拟商户不营业的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
//下单操作,以及商户服务的接口 @Slf4j @RestController @RequestMapping("order") public class OrderController { //总订单创建数量 private AtomicLong createOrderCounter = new AtomicLong(); private RabbitAdmin rabbitAdmin; @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RestTemplate restTemplate; @PostConstruct public void init() { //注册createOrder.received指标,gauge指标只需要像这样初始化一次,直接关联到AtomicLong引用即可 Metrics.gauge("createOrder.totalSuccess", createOrderCounter); } //下单接口,提供用户ID和商户ID作为入参 @GetMapping("createOrder") public void createOrder(@RequestParam("userId") long userId, @RequestParam("merchantId") long merchantId) { //记录一次createOrder.received指标,表示收到下单请求 Metrics.counter("createOrder.received").increment(); Instant begin = Instant.now(); try { TimeUnit.MILLISECONDS.sleep(200); //模拟无效用户的情况,ID<10的为无效用户 if (userId < 10) throw new RuntimeException("invalid user"); //查询商户服务 Boolean merchantStatus = restTemplate.getForObject("http://localhost:45678/order/getMerchantStatus?merchantId=" + merchantId, Boolean.class); if (merchantStatus == null || !merchantStatus) throw new RuntimeException("closed merchant"); Order order = new Order(); order.setId(createOrderCounter.incrementAndGet()); //gauge指标可以得到自动更新 order.setUserId(userId); order.setMerchantId(merchantId); //发送MQ消息 rabbitTemplate.convertAndSend(Consts.EXCHANGE, Consts.ROUTING_KEY, order); //记录一次createOrder.success指标,表示下单成功,同时提供耗时 Metrics.timer("createOrder.success").record(Duration.between(begin, Instant.now())); } catch (Exception ex) { log.error("creareOrder userId {} failed", userId, ex); //记录一次createOrder.failed指标,表示下单失败,同时提供耗时,并且以tag记录失败原因 Metrics.timer("createOrder.failed", "reason", ex.getMessage()).record(Duration.between(begin, Instant.now())); } } //商户查询接口 @GetMapping("getMerchantStatus") public boolean getMerchantStatus(@RequestParam("merchantId") long merchantId) throws InterruptedException { //只有商户ID为2的商户才是营业的 TimeUnit.MILLISECONDS.sleep(200); return merchantId == 2; } }
接下来是 DeliverOrderHandler 配送服务的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
//配送服务消息处理程序 @RestController @Slf4j @RequestMapping("deliver") public class DeliverOrderHandler { //配送服务运行状态 private volatile boolean deliverStatus = true; private AtomicLong deliverCounter = new AtomicLong(); //通过一个外部接口来改变配送状态模拟配送服务停工 @PostMapping("status") public void status(@RequestParam("status") boolean status) { deliverStatus = status; } @PostConstruct public void init() { //同样注册一个gauge指标deliverOrder.totalSuccess,代表总的配送单量,只需注册一次即可 Metrics.gauge("deliverOrder.totalSuccess", deliverCounter); } //监听MQ消息 @RabbitListener(queues = Consts.QUEUE, concurrency = "5") public void deliverOrder(Order order) { Instant begin = Instant.now(); //对deliverOrder.received进行递增,代表收到一次订单消息,counter类型 Metrics.counter("deliverOrder.received").increment(); try { if (!deliverStatus) throw new RuntimeException("deliver outofservice"); TimeUnit.MILLISECONDS.sleep(500); deliverCounter.incrementAndGet(); //配送成功指标deliverOrder.success,timer类型 Metrics.timer("deliverOrder.success").record(Duration.between(begin, Instant.now())); } catch (Exception ex) { log.error("deliver Order {} failed", order, ex); //配送失败指标deliverOrder.failed,同样附加了失败原因作为tags,timer类型 Metrics.timer("deliverOrder.failed", "reason", ex.getMessage()).record(Duration.between(begin, Instant.now())); } } }
同时,我们模拟了一个配送服务整体状态的开关,调用 status 接口可以修改其状态。至此,我们完成了场景准备,接下来开始配置指标监控。
安装Grafana,然后进入 Grafana 配置一个 InfluxDB 数据源;配置好数据源之后,就可以添加一个监控面板,然后在面板中添加各种监控图表。比如,我们在一个下单次数图表中添加了下单收到、成功和失败三个指标。
类似地, 我们配置出一个完整的业务监控面板,包含之前实现的 8 个指标:
- 配置 2 个 Gauge 图表分别呈现总订单完成次数、总配送完成次数。
- 配置 4 个 Graph 图表分别呈现下单操作的次数和性能,以及配送操作的次数和性能。
第一种情况是,使用合法的用户 ID 和营业的商户 ID 运行一段时间
第二种情况是,模拟无效用户 ID 运行一段时间
第三种情况是,尝试一下因为商户不营业导致的下单失败
第四种情况是,配送停止。我们通过 curl 调用接口,来设置配送停止开关
http://localhost:45678/deliver/status?status=false除了手动添加业务监控指标外,Micrometer 框架还帮我们自动做了很多有关 JVM 内部各种数据的指标。
其实,完整的应用监控体系一般由三个方面构成,包括日志 Logging、指标 Metrics 和追踪 Tracing。
追踪也叫做全链路追踪,比较有代表性的开源系统是SkyWalking和Pinpoint。一般而言,接入此类系统无需额外开发,使用其提供的 javaagent 来启动 Java 程序,就可以通过动态修改字节码实现各种组件的改写,以加入追踪代码(类似 AOP)。
全链路追踪的原理是:
- 请求进入第一个组件时,先生成一个 TraceID,作为整个调用链(Trace)的唯一标识
- 对于每次操作,都记录耗时和相关信息形成一个 Span 挂载到调用链上,Span 和 Span之间同样可以形成树状关联,出现远程调用、跨系统调用的时候,把 TraceID 进行透传(比如,HTTP 调用通过请求透传,MQ 消息则通过消息透传)
- 把这些数据汇总提交到数据库中,通过一个 UI 界面查询整个树状调用链
- 同时,我们一般会把 TraceID 记录到日志中,方便实现日志和追踪的关联
对比了日志、指标和追踪的区别和特点
- 完善的监控体系三者缺一不可,它们还可以相互配合,比如通过指标发现性能问题,通过追踪定位性能问题所在的应用和操作,最后通过日志定位出具体请求的明细参数。
24.5:思考与讨论
- Spring Boot Actuator 提供了大量内置端点,你觉得端点和自定义一个@RestController 有什么区别呢?你能否根据官方文档,开发一个自定义端点呢?
- 在介绍指标 Metrics 时我们看到,InfluxDB 中保存了由 Micrometer 框架自动帮我们收集的一些应用指标。你能否参考源码中两个 Grafana 配置的 JSON 文件,把这些指标在Grafana 中配置出一个完整的应用监控面板呢?
25:如何利用好异步处理
异步处理是互联网应用不可或缺的一种架构模式,大多数业务项目都是由同步处理、异步处理和定时任务处理三种模式相辅相成实现的。
区别于同步处理,异步处理无需同步等待流程处理完毕,因此适用场景主要包括:
- 服务于主流程的分支流程。比如,在注册流程中,把数据写入数据库的操作是主流程,但注册后给用户发优惠券或欢迎短信的操作是分支流程,时效性不那么强,可以进行异步处理。
- 用户不需要实时看到结果的流程。比如,下单后的配货、送货流程完全可以进行异步处理,每个阶段处理完成后,再给用户发推送或短信让用户知晓即可。
异步处理因为可以有 MQ 中间件的介入用于任务的缓冲的分发,所以相比于同步处理,在应对流量洪峰、实现模块解耦和消息广播方面有功能优势。
异步处理虽然好用,但在实现的时候却有三个最容易犯的错,分别是异步处理流程的可靠性问题、消息发送模式的区分问题,以及大量死信消息堵塞队列的问题。
使用 Spring AMQP 来操作 RabbitMQ,所以需要先引入amqp 依赖:
1 2 3 4
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
config
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
@Configuration public class RabbitConfiguration { public static final String QUEUE = "newuserQueueCompensation"; public static final String EXCHANGE = "newuserExchangeCompensation"; public static final String ROUTING_KEY = "newuserRoutingCompensation"; //队列 @Bean public Queue queue() { return new Queue(QUEUE); } //交换器 @Bean public Exchange exchange() { return ExchangeBuilder.directExchange(EXCHANGE).durable(true).build(); } //绑定 @Bean public Binding binding() { return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY).noargs(); } }
25.1:异步处理需要消息补偿闭环
使用类似 RabbitMQ、RocketMQ 等 MQ 系统来做消息队列实现异步处理,虽然说消息可以落地到磁盘保存,即使 MQ 出现问题消息数据也不会丢失,但是异步流程在消息发送、传输、处理等环节,都可能发生消息丢失。此外,任何 MQ 中间件都无法确保 100% 可用,需要考虑不可用时异步流程如何继续进行。
对于异步处理流程,必须考虑补偿或者说建立主备双活流程。
我们来看一个用户注册后异步发送欢迎消息的场景。用户注册落数据库的流程为同步流程,会员服务收到消息后发送欢迎消息的流程为异步流程。
分析下流程图:
- 蓝色的线,使用 MQ 进行的异步处理,我们称作主线,可能存在消息丢失的情况(虚线代表异步调用)
- 绿色的线,使用补偿 Job 定期进行消息补偿,我们称作备线,用来补偿主线丢失的消息
- 考虑到极端的 MQ 中间件失效的情况,我们要求备线的处理吞吐能力达到主线的能力水平
首先,定义 UserController 用于注册 + 发送异步消息。对于注册方法,我们一次性注册 10 个用户,用户注册消息不能发送出去的概率为 50%。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
@RestController @Slf4j @RequestMapping("user") public class UserController { @Autowired private UserService userService; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("register") public void register() { //模拟10个用户注册 IntStream.rangeClosed(1, 10).forEach(i -> { //落库 User user = userService.register(); //模拟50%的消息可能发送失败 if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) { //通过RabbitMQ发送消息 rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE, RabbitConfiguration.ROUTING_KEY, user); log.info("sent mq user {}", user.getId()); } }); } }
定义 MemberService 类用于模拟会员服务。会员服务监听用户注册成功的消息,并发送欢迎短信。我们使用 ConcurrentHashMap 来存放那些发过短信的用户 ID 实现幂等,避免相同的用户进行补偿时重复发送短信
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
@Component @Slf4j public class MemberService { //发送欢迎消息的状态 private Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>(); //监听用户注册成功的消息,发送欢迎消息 @RabbitListener(queues = RabbitConfiguration.QUEUE) public void listen(User user) { log.info("receive mq user {}", user.getId()); welcome(user); } //发送欢迎消息 public void welcome(User user) { //去重操作 if (welcomeStatus.putIfAbsent(user.getId(), true) == null) { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { } log.info("memberService: welcome new user {}", user.getId()); } } }
对于 MQ 消费程序,处理逻辑务必考虑去重(支持幂等),原因有几个:
- MQ 消息可能会因为中间件本身配置错误、稳定性等原因出现重复
- 自动补偿重复,比如本例,同一条消息可能既走 MQ 也走补偿,肯定会出现重复,而且考虑到高内聚,补偿 Job 本身不会做去重处理
- 人工补偿重复。出现消息堆积时,异步处理流程必然会延迟。如果我们提供了通过后台进行补偿的功能,那么在处理遇到延迟的时候,很可能会先进行人工补偿,过了一段时间后处理程序又收到消息了,重复处理。我之前就遇到过一次由 MQ 故障引发的事故,MQ 中堆积了几十万条发放资金的消息,导致业务无法及时处理,运营以为程序出错了就先通过后台进行了人工处理,结果 MQ 系统恢复后消息又被重复处理了一次,造成大量资金重复发放。
定义补偿 Job 也就是备线操作。我们在 CompensationJob 中定义一个 @Scheduled 定时任务,5 秒做一次补偿操作,因为 Job 并不知道哪些用户注册的消息可能丢失,所以是全量补偿,补偿逻辑是:每 5 秒补偿一次,按顺序一次补偿 5 个用户,下一次补偿操作从上一次补偿的最后一个用户 ID 开始;对于补偿任务我们提交到线程池进行“异步”处理,提高处理能力
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
@Component @Slf4j public class CompensationJob { //补偿Job异步处理线程池 private static ThreadPoolExecutor compensationThreadPool = new ThreadPoolExecutor( 10, 10, 1, TimeUnit.HOURS, new ArrayBlockingQueue<>(1000), new ThreadFactoryBuilder().setNameFormat("compensation-threadpool-%d").get()); @Autowired private UserService userService; @Autowired private MemberService memberService; //目前补偿到哪个用户ID private long offset = 0; //10秒后开始补偿,5秒补偿一次 @Scheduled(initialDelay = 10_000, fixedRate = 5_000) public void compensationJob() { log.info("开始从用户ID {} 补偿", offset); //获取从offset开始的用户 userService.getUsersAfterIdWithLimit(offset, 5).forEach(user -> { compensationThreadPool.execute(() -> memberService.welcome(user)); offset = user.getId(); }); } }
为了实现高内聚,主线和备线处理消息,最好使用同一个方法。比如,本例中MemberService 监听到 MQ 消息和 CompensationJob 补偿,调用的都是 welcome 方法。
Demo 中的补偿逻辑比较简单,生产级的代码应该在以下几个方面进行加强:
- 考虑配置补偿的频次、每次处理数量,以及补偿线程池大小等参数为合适的值,以满足补偿的吞吐量
- 考虑备线补偿数据进行适当延迟。比如,对注册时间在 30 秒之前的用户再进行补偿,以方便和主线 MQ 实时流程错开,避免冲突
- 诸如当前补偿到哪个用户的 offset 数据,需要落地数据库
- 补偿 Job 本身需要高可用,可以使用类似 XXLJob 或 ElasticJob 等任务系统
运行程序,执行注册方法注册 10 个用户可以看到:
- 总共 10 个用户,MQ 发送成功的用户有四个,分别是用户 1、5、7、8。
- 补偿任务第一次运行,补偿了用户 2、3、4,第二次运行补偿了用户 6、9,第三次运行补充了用户 10。
- 针对消息的补偿闭环处理的最高标准是,能够达到补偿全量数据的吞吐量。也就是说,如果补偿备线足够完善,即使直接把 MQ 停机,虽然会略微影响处理的及时性,但至少确保流程都能正常执行。
25.2:消息模式是广播还是工作队列
异步处理的一个重要优势,是实现消息广播
消息广播,和我们平时说的“广播”意思差不多,就是希望同一条消息,不同消费者都能分别消费;而队列模式,就是不同消费者共享消费同一个队列的数据,相同消息只能被某一个消费者消费一次
比如,同一个用户的注册消息,会员服务需要监听以发送欢迎短信,营销服务同样需要监听以发送新用户小礼物。但是,会员服务、营销服务都可能有多个实例,我们期望的是同一个用户的消息,可以同时广播给不同的服务(广播模式),但对于同一个服务的不同实例(比如会员服务 1 和会员服务 2),不管哪个实例来处理,处理一次即可(工作队列模式)
对于类似 RocketMQ 这样的 MQ 来说,实现类似功能比较简单直白:如果消费者属于一个组,那么消息只会由同一个组的一个消费者来消费;如果消费者属于不同组,那么每个组都能消费一遍消息。对于 RabbitMQ 来说,消息路由的模式采用的是队列 + 交换器,队列是消息的载体,交换器决定了消息路由到队列的方式,配置比较复杂,容易出错。
第一步,实现会员服务监听用户服务发出的新用户注册消息的那部分逻辑。如果我们启动两个会员服务,那么同一个用户的注册消息应该只能被其中一个实例消费。我们分别实现 RabbitMQ 队列、交换器、绑定三件套。其中,队列用的是匿名队列,交换器用的是直接交换器 DirectExchange,交换器绑定到匿名队列的路由 Key 是空字符串。在收到消息之后,我们会打印所在实例使用的端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
//为了代码简洁直观,我们把消息发布者、消费者、以及MQ的配置代码都放在了一起 @Slf4j @Configuration @RestController @RequestMapping("workqueuewrong") public class WorkQueueWrong { private static final String EXCHANGE = "newuserExchange"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString()); } //使用匿名队列作为消息队列 @Bean public Queue queue() { return new AnonymousQueue(); } //声明DirectExchange交换器,绑定队列到交换器 @Bean public Declarables declarables() { DirectExchange exchange = new DirectExchange(EXCHANGE); return new Declarables(queue(), exchange, BindingBuilder.bind(queue()).to(exchange).with("")); } //监听队列,队列名称直接通过SpEL表达式引用Bean @RabbitListener(queues = "#{queue.name}") public void memberService(String userName) { log.info("memberService: welcome message sent to new user {} from {}", userName, System.getProperty("server.port")); } }
使用 12345 和 45678 两个端口启动两个程序实例后,调用 sendMessage 接口发送一条消息,输出的日志,显示同一个会员服务两个实例都收到了消息;出现这个问题的原因是,我们没有理清楚 RabbitMQ 直接交换器和队列的绑定关系。
RabbitMQ 的直接交换器根据 routingKey 对消息进行路由。由于我们的程序每次启动都会创建匿名(随机命名)的队列,所以相当于每一个会员服务实例都对应独立的队列,以空 routingKey 绑定到直接交换器。用户服务发出消息的时候也设置了routingKey 为空,所以直接交换器收到消息之后,发现有两条队列匹配,于是都转发了消息
要修复这个问题其实很简单,对于会员服务不要使用匿名队列,而是使用同一个队列即可。把上面代码中的匿名队列替换为一个普通队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
@Slf4j @Configuration @RestController @RequestMapping("workqueueright") public class WorkQueueRight { private static final String EXCHANGE = "newuserExchange"; private static final String QUEUE = "newuserQueue"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "test", UUID.randomUUID().toString()); } @Bean public Queue queue() { return new Queue(QUEUE); } @Bean public Declarables declarables() { DirectExchange exchange = new DirectExchange(EXCHANGE); return new Declarables(queue(), exchange, BindingBuilder.bind(queue()).to(exchange).with("test")); } @RabbitListener(queues = "#{queue.name}") public void memberService(String userName) { log.info("memberService: welcome message sent to new user {}", userName); } }
测试发现,对于同一条消息来说,两个实例中只有一个实例可以收到,不同的消息按照轮询分发给不同的实例。现在,交换器和队列的关系是这样的
第二步,进一步完整实现用户服务需要广播消息给会员服务和营销服务的逻辑;我们希望会员服务和营销服务都可以收到广播消息,但会员服务或营销服务中的每个实例只需要收到一次消息。我们声明了一个队列和一个广播交换器 FanoutExchange,然后模拟两个用户服务和两个营销服务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
@Slf4j @Configuration @RestController @RequestMapping("fanoutwrong") public class FanoutQueueWrong { private static final String QUEUE = "newuser"; private static final String EXCHANGE = "newuser"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString()); } //声明FanoutExchange,然后绑定到队列,FanoutExchange绑定队列的时候不需要routingKey @Bean public Declarables declarables() { Queue queue = new Queue(QUEUE); FanoutExchange exchange = new FanoutExchange(EXCHANGE); return new Declarables(queue, exchange, BindingBuilder.bind(queue).to(exchange)); } //会员服务实例1 @RabbitListener(queues = QUEUE) public void memberService1(String userName) { log.info("memberService1: welcome message sent to new user {}", userName); } //会员服务实例2 @RabbitListener(queues = QUEUE) public void memberService2(String userName) { log.info("memberService2: welcome message sent to new user {}", userName); } //营销服务实例1 @RabbitListener(queues = QUEUE) public void promotionService1(String userName) { log.info("promotionService1: gift sent to new user {}", userName); } //营销服务实例2 @RabbitListener(queues = QUEUE) public void promotionService2(String userName) { log.info("promotionService2: gift sent to new user {}", userName); } }
我们请求四次 sendMessage 接口,注册四个用户。通过日志可以发现,一条用户注册的消息,要么被会员服务收到,要么被营销服务收到,显然这不是广播。那,我们使用的FanoutExchange,看名字就应该是实现广播的交换器,为什么根本没有起作用呢?
其实,广播交换器非常简单,它会忽略 routingKey,广播消息到所有绑定的队列。在这个案例中,两个会员服务和两个营销服务都绑定了同一个队列,所以这四个服务只能收到一次消息
修改方式很简单,我们把队列进行拆分,会员和营销两组服务分别使用一条独立队列绑定到广播交换器即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
@Slf4j @Configuration @RestController @RequestMapping("fanoutright") public class FanoutQueueRight { private static final String MEMBER_QUEUE = "newusermember"; private static final String PROMOTION_QUEUE = "newuserpromotion"; private static final String EXCHANGE = "newuser"; @Autowired private RabbitTemplate rabbitTemplate; @GetMapping public void sendMessage() { rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString()); } @Bean public Declarables declarables() { //会员服务队列 Queue memberQueue = new Queue(MEMBER_QUEUE); //营销服务队列 Queue promotionQueue = new Queue(PROMOTION_QUEUE); //广播交换器 FanoutExchange exchange = new FanoutExchange(EXCHANGE); //两个队列绑定到同一个交换器 return new Declarables(memberQueue, promotionQueue, exchange, BindingBuilder.bind(memberQueue).to(exchange), BindingBuilder.bind(promotionQueue).to(exchange)); } @RabbitListener(queues = MEMBER_QUEUE) public void memberService1(String userName) { log.info("memberService1: welcome message sent to new user {}", userName); } @RabbitListener(queues = MEMBER_QUEUE) public void memberService2(String userName) { log.info("memberService2: welcome message sent to new user {}", userName); } @RabbitListener(queues = PROMOTION_QUEUE) public void promotionService1(String userName) { log.info("promotionService1: gift sent to new user {}", userName); } @RabbitListener(queues = PROMOTION_QUEUE) public void promotionService2(String userName) { log.info("promotionService2: gift sent to new user {}", userName); } }
现在,交换器和队列的结构是这样的
从日志输出可以验证,对于每一条 MQ 消息,会员服务和营销服务分别都会收到一次,一条消息广播到两个服务的同时,在每一个服务的两个实例中通过轮询接收
- 对于异步流程来说,消息路由模式一旦配置出错,轻则可能导致消息的重复处理,重则可能导致重要的服务无法接收到消息,最终造成业务逻辑错误。
25.3:别让死信堵塞了消息队列
如果线程池的任务队列没有上限,那么最终可能会导致OOM。使用消息队列处理异步流程的时候,我们也同样要注意消息队列的任务堆积问题。对于突发流量引起的消息队列堆积,问题并不大,适当调整消费者的消费能力应该就可以解决。但在很多时候,消息队列的堆积堵塞,是因为有大量始终无法处理的消息。
比如,用户服务在用户注册后发出一条消息,会员服务监听到消息后给用户派发优惠券,但因为用户并没有保存成功,会员服务处理消息始终失败,消息重新进入队列,然后还是处理失败。这种在 MQ 中像幽灵一样回荡的同一条消息,就是死信。
随着 MQ 被越来越多的死信填满,消费者需要花费大量时间反复处理死信,导致正常消息的消费受阻,最终 MQ 可能因为数据量过大而崩溃。
首先,定义一个队列、一个直接交换器,然后把队列绑定到交换器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
@Configuration @Slf4j public class RabbitConfiguration { @Autowired private RabbitTemplate rabbitTemplate; @Bean public Declarables declarables() { //队列 Queue queue = new Queue(Consts.QUEUE); //交换器 DirectExchange directExchange = new DirectExchange(Consts.EXCHANGE); //快速声明一组对象,包含队列、交换器,以及队列到交换器的绑定 return new Declarables(queue, directExchange, BindingBuilder.bind(queue).to(directExchange).with(Consts.ROUTING_KEY)); }
} ```
然后,实现一个 sendMessage 方法来发送消息到 MQ,访问一次提交一条消息,使用自增标识作为消息内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
@RequestMapping("deadletter") @Slf4j @RestController public class DeadLetterController { //自增消息标识 AtomicLong atomicLong = new AtomicLong(); @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("sendMessage") public void sendMessage() { String msg = "msg" + atomicLong.incrementAndGet(); log.info("send message {}", msg); //发送消息 rabbitTemplate.convertAndSend(Consts.EXCHANGE, msg); } } public class Consts { public static final String QUEUE = "test"; public static final String EXCHANGE = "test"; public static final String ROUTING_KEY = "test"; public static final String DEAD_EXCHANGE = "deadtest"; public static final String DEAD_QUEUE = "deadtest"; public static final String DEAD_ROUTING_KEY = "deadtest"; }
收到消息后,直接抛出空指针异常,模拟处理出错的情况
1 2 3 4 5 6 7 8 9 10 11 12
@Component @Slf4j public class MQListener { @RabbitListener(queues = Consts.QUEUE) public void handler(String data) { //http://localhost:15672/#/ log.info("got message {}", data); throw new NullPointerException("error"); //throw new AmqpRejectAndDontRequeueException("error"); } }
调用 sendMessage 接口发送两条消息,然后来到 RabbitMQ 管理台,可以看到这两条消息始终在队列中,不断被重新投递,导致重新投递 QPS 达到了 1063;同时,在日志中可以看到大量异常信息
解决死信无限重复进入队列最简单的方式是,在程序处理出错的时候,直接抛出AmqpRejectAndDontRequeueException 异常,避免消息重新进入队列
但,我们更希望的逻辑是,对于同一条消息,能够先进行几次重试,解决因为网络问题导致的偶发消息处理失败,如果还是不行的话,再把消息投递到专门的一个死信队列。对于来自死信队列的数据,我们可能只是记录日志发送报警,即使出现异常也不会再重复投递。整个逻辑如下图所示
针对这个问题,Spring AMQP 提供了非常方便的解决方案:
- 首先,定义死信交换器和死信队列。其实,这些都是普通的交换器和队列,只不过被我们专门用于处理死信消息。
- 然后,通过 RetryInterceptorBuilder 构建一个 RetryOperationsInterceptor,用于处理失败时候的重试。这里的策略是,最多尝试 5 次(重试 4 次);并且采取指数退避重试,首次重试延迟 1 秒,第二次 2 秒,以此类推,最大延迟是 10 秒;如果第 4 次重试还是失败,则使用 RepublishMessageRecoverer 把消息重新投入一个“死信交换器”中。
- 最后,定义死信队列的处理程序。这个案例中,我们只是简单记录日志。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
//RabbitConfiguration添加配置项 //定义死信交换器和队列,并且进行绑定 @Bean public Declarables declarablesForDead() { Queue queue = new Queue(Consts.DEAD_QUEUE); DirectExchange directExchange = new DirectExchange(Consts.DEAD_EXCHANGE); return new Declarables(queue, directExchange, BindingBuilder.bind(queue).to(directExchange).with(Consts.DEAD_ROUTING_KEY)); } //定义重试操作拦截器 @Bean public RetryOperationsInterceptor interceptor() { return RetryInterceptorBuilder.stateless() .maxAttempts(5) //最多尝试(不是重试)5次 .backOffOptions(1000, 2.0, 10000) //指数退避重试 .recoverer(new RepublishMessageRecoverer(rabbitTemplate, Consts.DEAD_EXCHANGE, Consts.DEAD_ROUTING_KEY)) .build(); } //通过定义SimpleRabbitListenerContainerFactory,设置其adviceChain属性为之前定义的RetryOperationsInterceptor @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAdviceChain(interceptor()); factory.setConcurrentConsumers(10); //增加工作线程 return factory; }
```
1 2 3 4 5 6
//MQListener添加监听 //死信队列处理程序 @RabbitListener(queues = Consts.DEAD_QUEUE) public void deadHandler(String data) { log.error("got dead message {}", data); }
执行程序,发送两条消息
- msg1 的 4 次重试间隔分别是 1 秒、2 秒、4 秒、8 秒,再加上首次的失败,所以最大尝试次数是 5。
- 4 次重试后,RepublishMessageRecoverer 把消息发往了死信交换器。
- 死信处理程序输出了 got dead message 日志。
虽然我们几乎同时发送了两条消息,但是 msg2 是在 msg1的四次重试全部结束后才开始处理。原因是,默认情况下SimpleMessageListenerContainer 只有一个消费线程。
可以通过增加消费线程来避免性能问题,如下我们直接设置 concurrentConsumers 参数为 10,来增加到 10 个工作线
程;我们也可以设置 maxConcurrentConsumers 参数,来让SimpleMessageListenerContainer 自己动态地调整消费者线程数。不过,我们需要特别注意它的动态开启新线程的策略。
在使用异步处理这种架构模式的时候,我们一般都会使用 MQ 中间件配合实现异步流程,
需要重点考虑四个方面的问题
- 第一,要考虑异步流程丢消息或处理中断的情况,异步流程需要有备线进行补偿。比如,我们今天介绍的全量补偿方式,即便异步流程彻底失效,通过补偿也能让业务继续进行。
- 第二,异步处理的时候需要考虑消息重复的可能性,处理逻辑需要实现幂等,防止重复处理。
- 第三,微服务场景下不同服务多个实例监听消息的情况,一般不同服务需要同时收到相同的消息,而相同服务的多个实例只需要轮询接收消息。我们需要确认 MQ 的消息路由配置是否满足需求,以避免消息重复或漏发问题。
- 第四,要注意始终无法处理的死信消息,可能会引发堵塞 MQ 的问题。一般在遇到消息处理失败的时候,我们可以设置一定的重试策略。如果重试还是不行,那可以把这个消息扔到专有的死信队列特别处理,不要让死信影响到正常消息的处理。
25.4:思考与讨论
在用户注册后发送消息到 MQ,然后会员服务监听消息进行异步处理的场景下,有些时候我们会发现,虽然用户服务先保存数据再发送 MQ,但会员服务收到消息后去查询数据库,却发现数据库中还没有新用户的信息。你觉得,这可能是什么问题呢,又该如何解决呢?
- 可能是数据写到了主库,然后查询了从库。但因为主从同步有延迟,导致没有查询到
- 业务代码把保存数据和发MQ消息放在了一个事务中,有概率收到消息的时候事务还没有提交完成,当时开发同学的处理方式是收MQ消息的时候sleep 1秒,或许应该是先提交事务,完成后再发MQ消息,但是这又出来一个问题MQ消息发送失败怎么办?所以后来演化为建立本地消息表来确保MQ消息可补偿,把业务处理和保存MQ消息到本地消息表操作在相同事务内处理,然后异步发送和补偿发送消息表中的消息到MQ
- 异步处理不仅仅是通过 MQ 来实现,还有其他方式比如开新线程执行,返回 Future还有各种异步框架,比如 Vertx,它是通过 callback 的方式实现
除了使用 Spring AMQP 实现死信消息的重投递外,RabbitMQ 2.8.0 后支持的死信交换器 DLX 也可以实现类似功能。你能尝试用 DLX 实现吗,并比较下这两种处理机制?
- 自定义的私信队列,其实是发送失败,主要是生产者发送到mq的时候,发送失败,进了自定义的私信队列; DLX的方式其实解决已到了mq,但是因为各种原因,无法到达正常的队列中,大概分类下面几种吧: 消息消费时被拒绝(basic.reject / basic.nack),并且requeue = false 消息TTL过期 队列达到最大长度
使用DXL实现延迟重试
config
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
@Configuration @Slf4j public class RabbitConfiguration implements RabbitListenerConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) { rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean public MessageHandlerMethodFactory messageHandlerMethodFactory() { DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter()); return messageHandlerMethodFactory; } @Bean public MappingJackson2MessageConverter consumerJackson2MessageConverter() { return new MappingJackson2MessageConverter(); } @Bean public MessagePropertiesConverter messagePropertiesConverter() { return new DefaultMessagePropertiesConverter(); } @Bean public Declarables declarablesForWorker() { Queue queue = new Queue(Consts.QUEUE); DirectExchange directExchange = new DirectExchange(Consts.EXCHANGE); return new Declarables(queue, directExchange, BindingBuilder.bind(queue).to(directExchange).with(Consts.ROUTING_KEY)); } @Bean public Declarables declarablesForBuffer() { Queue queue = QueueBuilder.durable(Consts.BUFFER_QUEUE) .withArgument("x-dead-letter-exchange", Consts.EXCHANGE) .withArgument("x-dead-letter-routing-key", Consts.ROUTING_KEY) .withArgument("x-message-ttl", Consts.RETRY_INTERNAL) .build(); DirectExchange directExchange = new DirectExchange(Consts.BUFFER_EXCHANGE); return new Declarables(queue, directExchange, BindingBuilder.bind(queue).to(directExchange).with(Consts.BUFFER_ROUTING_KEY)); } @Bean public Declarables declarablesForDead() { Queue queue = new Queue(Consts.DEAD_QUEUE); DirectExchange directExchange = new DirectExchange(Consts.DEAD_EXCHANGE); return new Declarables(queue, directExchange, BindingBuilder.bind(queue).to(directExchange).with(Consts.DEAD_ROUTING_KEY)); } }
监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
@Component @Slf4j public class MQListener { @Autowired private MessagePropertiesConverter messagePropertiesConverter; @RabbitListener(queues = Consts.QUEUE) public void handler(@Payload Message message, Channel channel) throws IOException { String m = new String(message.getBody()); try { log.info("Handler 收到消息:{}", m); throw new RuntimeException("处理消息失败"); } catch (Exception e) { Map<String, Object> headers = message.getMessageProperties().getHeaders(); Long retryCount = getRetryCount(headers); if (retryCount < Consts.RETRY_COUNT) { log.info("Handler 消费消息:{} 异常,准备重试第{}次", m, ++retryCount); AMQP.BasicProperties rabbitMQProperties = messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), "UTF-8"); rabbitMQProperties.builder().headers(headers); channel.basicPublish(Consts.BUFFER_EXCHANGE, Consts.BUFFER_ROUTING_KEY, rabbitMQProperties, message.getBody()); } else { log.info("Handler 消费消息:{} 异常,已重试 {} 次,发送到死信队列处理!", m, Consts.RETRY_COUNT); channel.basicPublish(Consts.DEAD_EXCHANGE, Consts.DEAD_ROUTING_KEY, null, message.getBody()); } } } private long getRetryCount(Map<String, Object> headers) { long retryCount = 0; if (null != headers) { if (headers.containsKey("x-death")) { List<Map<String, Object>> deathList = (List<Map<String, Object>>) headers.get("x-death"); if (!deathList.isEmpty()) { Map<String, Object> deathEntry = deathList.get(0); retryCount = (Long) deathEntry.get("count"); } } } return retryCount; } @RabbitListener(queues = Consts.DEAD_QUEUE) public void deadHandler(@Payload Message message) { log.error("DeadHandler 收到死信消息: {}", new String(message.getBody())); } }
controller
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
@RequestMapping("deadletter") @Slf4j @RestController public class DeadLetterController { AtomicLong atomicLong = new AtomicLong(); @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("send") public void send() { String message = "msg" + atomicLong.incrementAndGet(); log.info("Client 发送消息 {}", message); rabbitTemplate.convertAndSend(Consts.EXCHANGE, Consts.QUEUE, message); } } public class Consts { public static final Integer RETRY_INTERNAL = 3000; public static final Integer RETRY_COUNT = 2;
1 2 3 4 5 6 7 8 9 10 11
public static final String EXCHANGE = "worker"; public static final String QUEUE = "worker"; public static final String ROUTING_KEY = "worker"; public static final String BUFFER_QUEUE = "buffer"; public static final String BUFFER_EXCHANGE = "buffer"; public static final String BUFFER_ROUTING_KEY = "buffer"; public static final String DEAD_EXCHANGE = "dead"; public static final String DEAD_QUEUE = "dead"; public static final String DEAD_ROUTING_KEY = "dead"; } ```
- demo 是基于canal做mysql数据同步,需要将解析好的数据发到kafka里面,再进行处理。在使用的时候发现这么一个问题,就是kafka多partition消费时不能保证消息的顺序消费,进而导致mysql数据同步异常。由于kafka可以保证在同一个partition内消息有序,于是我自定义了一个分区器,将数据的id取hashcode然后根据partition的数量取余作为分区号,保证同一条数据的binlog能投递到同一个partition中,从而达到消息顺序消费的目的
26:NOSQL与RDBMS相辅相成
- NoSQL 一般可以分为缓存数据库(Redis)、时间序列数据库( InfluxDB)、全文搜索数据库(ElasticSearch)、文档数据库(MongoDB)、图数据库等。
26.1:Redis VS MySQL
- Redis 是一款设计简洁的缓存数据库,数据都保存在内存中,所以读写单一 Key 的性能非常高
- 做一个简单测试,分别填充 10 万条数据到 Redis 和 MySQL 中。MySQL 中的name 字段做了索引,相当于 Redis 的 Key,data 字段为 100 字节的数据,相当于 Redis的 Value
- 比较一下从 MySQL 和 Redis 随机读取单条数据的性能。“公平”起见,像 Redis那样,我们使用 MySQL 时也根据 Key 来查 Value,也就是根据 name 字段来查 data 字段,并且我们给 name 字段做了索引
- MySQL 90% 的请求需要 61ms,QPS 为 1460;而 Redis 90% 的请求在 5ms 左右,QPS 达到了14008,几乎是 MySQL 的十倍
- 但 Redis 薄弱的地方是,不擅长做 Key 的搜索。对 MySQL,我们可以使用 LIKE 操作前匹配走 B+ 树索引实现快速搜索;但对 Redis,我们使用 Keys 命令对 Key 的搜索,其实相当于在 MySQL 里做全表扫描。
- Redis 慢的原因有两个:
- Redis 的 Keys 命令是 O(n) 时间复杂度。如果数据库中 Key 的数量很多,就会非常慢。
- Redis 是单线程的,对于慢的命令如果有并发,串行执行就会非常耗时。
- 一般而言,我们使用 Redis 都是针对某一个 Key 来使用,而不能在业务代码中使用 Keys命令从 Redis 中“搜索数据”,因为这不是 Redis 的擅长。对于 Key 的搜索,我们可以先通过关系型数据库进行,然后再从 Redis 存取数据(如果实在需要搜索 Key 可以使用SCAN 命令)。在生产环境中,我们一般也会配置 Redis 禁用类似 Keys 这种比较危险的命令
- 对于业务开发来说,大多数业务场景下Redis 是作为关系型数据库的辅助用于缓存的,我们一般不会把它当作数据库独立使用
- Redis 提供了丰富的数据结构(Set、SortedSet、Hash、List),并围绕这些数据结构提供了丰富的 API。如果我们好好利用这个特点的话,可以直接在 Redis中完成一部分服务端计算,避免“读取缓存 -> 计算数据 -> 保存缓存”三部曲中的读取和保存缓存的开销,进一步提高性能
26.2:InfluxDB vs MySQL
- InfluxDB 是一款优秀的时序数据库;时序数据库的优势,在于处理指标数据的聚合,并且读写效率非常高。
- 我们分别填充了 1000 万条数据到 MySQL 和 InfluxDB 中。其中,每条数据只有 ID、时间戳、10000 以内的随机值这 3 列信息,对于 MySQL 我们把时间戳列做了索引
- InfluxDB 批量插入 1000 万条数据仅用了 54 秒,相当于每秒插入 18 万条数据,速度相当快;MySQL 的批量插入,速度也挺快达到了每秒 4.8 万。
- 对这 1000 万数据进行一个统计,查询最近 60 天的数据,按照 1 小时的时间粒度聚合,统计 value 列的最大值、最小值和平均值,并将统计结果绘制成曲线图;可以看到单次查询MySQL 查询一次耗时 29 秒左右,而 InfluxDB 耗时 980ms
- 在按照时间区间聚合的案例上,我们看到了 InfluxDB 的性能优势。但,我们肯定不能把InfluxDB 当作普通数据库,原因是:
- InfluxDB 不支持数据更新操作,毕竟时间数据只能随着时间产生新数据,肯定无法对过去的数据做修改
- 从数据结构上说,时间序列数据数据没有单一的主键标识,必须包含时间戳,数据只能和时间戳进行关联,不适合普通业务数据
- 对于 MySQL 而言,针对大量的数据使用全表扫描的方式来聚合统计指标数据,性能非常差,一般只能作为临时方案来使用。此时,引入 InfluxDB 之类的时间序列数据库,就很有必要了。时间序列数据库可以作为特定场景(比如监控、统计)的主存储,也可以和关系型数据库搭配使用,作为一个辅助数据源,保存业务系统的指标数据
26.3:Elasticsearch vs MySQL
- Elasticsearch(以下简称 ES),是目前非常流行的分布式搜索和分析数据库,独特的倒排索引结构尤其适合进行全文搜索
- 简单来讲,倒排索引可以认为是一个 Map,其 Key 是分词之后的关键字,Value 是文档ID/ 片段 ID 的列表。我们只要输入需要搜索的单词,就可以直接在这个 Map 中得到所有包含这个单词的文档 ID/ 片段 ID 列表,然后再根据其中的文档 ID/ 片段 ID 查询出实际的文档内容。
- 对比下使用 ES 进行关键字全文搜索、在 MySQL 中使用 LIKE 进行搜索的效率差距。
- MySQL 可以做到仅更新某行数据的某个字段,但 ES 里每次数据字段更新都相当于整个文档索引重建。即便 ES 提供了文档部分更新的功能,但本质上只是节省了提交文档的网络流量,以及减少了更新冲突,其内部实现还是文档删除后重新构建索引。因此,如果要在 ES中保存一个类似计数器的值,要实现不断更新,其执行效率会非常低
- ES 是一个分布式的全文搜索数据库,所以与 MySQL 相比的优势在于文本搜索,而且因为其分布式的特性,可以使用一个大 ES 集群处理大规模数据的内容搜索。但,由于 ES 的索引是文档维度的,所以不适用于频繁更新的 OLTP 业务
- 一般而言,我们会把 ES 和 MySQL 结合使用,MySQL 直接承担业务系统的增删改操作,而 ES 作为辅助数据库,直接扁平化保存一份业务数据,用于复杂查询、全文搜索和统计
26.4:应对高并发的复合数据库架构
- MySQL InnoDB 引擎的 B+ 树对排序和范围查询友好,频繁数据更新的代价不是太大,因此适合 OLTP(On-Line Transaction Processing)
- ES 的 Lucene 采用了 FST(Finite State Transducer)索引 + 倒排索引,空间效率高,适合对变动不频繁的数据做索引,实现全文搜索。存储系统本身不可能对一份数据使用多种数据结构保存,因此不可能适用于所有场景
- 虽然在大多数业务场景下,MySQL 的性能都不算太差,但对于数据量大、访问量大、业务复杂的互联网应用来说,MySQL 因为实现了 ACID(原子性、一致性、隔离性、持久性)会比较重,而且横向扩展能力较差、功能单一,无法扛下所有数据量和流量,无法应对所有功能需求。因此,我们需要通过架构手段,来组合使用多种存储系统,取长补短,实现1+1>2 的效果
- 设计了一个包含多个数据库系统的、能应对各种高并发场景的一套数据服务的系统架构,其中包含了同步写服务、异步写服务和查询服务三部分,分别实现主数据库写入、辅助数据库写入和查询路由

- 如图上蓝色线所示,写入两种 MySQL 数据表和发送 MQ 消息的这三步,我们用一个同步写服务完成了。
- 如图中绿色线所示,有一个异步写服务,监听 MQ 的消息,继续完成辅助数据的更新操作。这里我们选用了 ES 和 InfluxDB 这两种辅助数据库,因此整个异步写数据操作有三步:
- MQ 消息不一定包含完整的数据,甚至可能只包含一个最新数据的主键 ID,我们需要根据 ID 从查询服务查询到完整的数据
- 写入 InfluxDB 的数据一般可以按时间间隔进行简单聚合,定时写入 InfluxDB。因此,这里会进行简单的客户端聚合,然后写入 InfluxDB
- ES 不适合在各索引之间做连接(Join)操作,适合保存扁平化的数据。比如,我们可以把订单下的用户、商户、商品列表等信息,作为内嵌对象嵌入整个订单 JSON,然后把整个扁平化的 JSON 直接存入 ES
- 对于数据写入操作,我们认为操作返回的时候同步数据一定是写入成功的,但是由于各种原因,异步数据写入无法确保立即成功,会有一定延迟,比如:
- 异步消息丢失的情况,需要补偿处理
- 写入 ES 的索引操作本身就会比较慢
- 写入 InfluxDB 的数据需要客户端定时聚合
- 对于查询服务,如图中红色线所示,我们需要根据一定的上下文条件(比如查询一致性要求、时效性要求、搜索的条件、需要返回的数据字段、搜索时间区间等)来把请求路由到合适的数据库,并且做一些聚合处理:
- 需要根据主键查询单条数据,可以从 MySQL Sharding 集群或 Redis 查询,如果对实时性要求不高也可以从 ES 查询
- 按照多个条件搜索订单的场景,可以从 MySQL 索引表查询出主键列表,然后再根据主键从 MySQL Sharding 集群或 Redis 获取数据详情
- 各种后台系统需要使用比较复杂的搜索条件,甚至全文搜索来查询订单数据,或是定时分析任务需要一次查询大量数据,这些场景对数据实时性要求都不高,可以到 ES 进行搜索。此外,MySQL 中的数据可以归档,我们可以在 ES 中保留更久的数据,而且查询历史数据一般并发不会很大,可以统一路由到 ES 查询
- 重要的业务主数据只能保存在 MySQL 这样的关系型数据库中,原因有三点:
- RDBMS 经过了几十年的验证,已经非常成熟
- RDBMS 的用户数量众多,Bug 修复快、版本稳定、可靠性很高
- RDBMS 强调 ACID,能确保数据完整
- 有两种类型的查询任务可以交给 MySQL 来做,性能会比较好,这也是 MySQL 擅长的地方:
- 按照主键 ID 的查询。直接查询聚簇索引,其性能会很高。但是单表数据量超过亿级后,性能也会衰退,而且单个数据库无法承受超大的查询并发,因此我们可以把数据表进行Sharding 操作,均匀拆分到多个数据库实例中保存。我们把这套数据库集群称作Sharding 集群
- 按照各种条件进行范围查询,查出主键 ID。对二级索引进行查询得到主键,只需要查询一棵 B+ 树,效率同样很高。但索引的值不宜过大,比如对 varchar(1000) 进行索引不太合适,而索引外键(一般是 int 或 bigint 类型)性能就会比较好。因此,我们可以在MySQL 中建立一张“索引表”,除了保存主键外,主要是保存各种关联表的外键,以及尽可能少的 varchar 类型的字段。这张索引表的大部分列都可以建上二级索引,用于进行简单搜索,搜索的结果是主键的列表,而不是完整的数据。由于索引表字段轻量并且数量不多(一般控制在 10 个以内),所以即便索引表没有进行 Sharding 拆分,问题也不会很大
26.5:思考与讨论
- 文档数据库 MongoDB,也是一种常用的 NoSQL。你觉得 MongoDB 的优势和劣势是什么呢?它适合用在什么场景下呢?
27:不要轻信客户端的数据
- 客户端传给服务端的数据只是信息收集,数据需要经过有效性验证、权限验证等后才能使用,并且这些数据只能认为是用户操作的意图,不能直接代表数据当前的状态。
27.1:客户端的计算不可信
- 用户下单时客户端肯定有商品的价格等信息,也会计算出订单的总价给用户确认,但是这些信息只能用于呈现和核对。即使客户端传给服务端的 POJO 中包含了这些信息,服务端也一定要重新从数据库来初始化商品的价格,重新计算最终的订单价格。如果不这么做的话,很可能会被黑客利用,商品总价被恶意修改为比较低的价格
- 我们真正直接使用的、可信赖的只是客户端传过来的商品 ID 和数量,服务端会根据这些信息重新计算最终的总价。如果服务端计算出来的商品价格和客户端传过来的价格不匹配的话,可以给客户端友好提示,让用户重新下单
- 下单成功后,服务端处理完成后会返回诸如商品单价、总价等信息给客户端。此时,客户端可以进行一次判断,如果和之前客户端的数据不一致的话,给予用户提示,用户确认没问题后再进入支付阶段
27.2:客户端提交的参数需要校验
使用 Spring Validation 采用注解的方式进行参数校验,更优雅
1 2 3 4 5 6 7 8 9 10
@Validated public class TrustClientParameterController { @PostMapping("/better") @ResponseBody public String better(@RequestParam("countryId") @Min(value = 1, message = "非法参数") @Max(value = 3, message = "非法参数") int countryId) { return allCountries.get(countryId).getName(); } }
27.3:不能信任请求头里的任何内容
一个比较常见的需求是,为了防刷,我们需要判断用户的唯一性。比如,针对未注册的新用户发送一些小奖品,我们不希望相同用户多次获得奖品。考虑到未注册的用户因为没有登录过所以没有用户标识,我们可能会想到根据请求的 IP 地址,来判断用户是否已经领过奖品。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
//通过一个 HashSet 模拟已发放过奖品的 IP 名单,每次领取奖品后把 IP 地址加入这个名单中。IP 地址的获取方式是:优先通过 X-Forwarded-For 请求头来获取,如果没有的话再通过 HttpServletRequest 的 getRemoteAddr 方法来获取。 @Slf4j @RequestMapping("trustclientip") @RestController public class TrustClientIpController { HashSet<String> activityLimit = new HashSet<>(); @GetMapping("test") public String test(HttpServletRequest request) { String ip = getClientIp(request); if (activityLimit.contains(ip)) { return "您已经领取过奖品"; } else { activityLimit.add(ip); return "奖品领取成功"; } } private String getClientIp(HttpServletRequest request) { String xff = request.getHeader("X-Forwarded-For"); if (xff == null) { return request.getRemoteAddr(); } else { return xff.contains(",") ? xff.split(",")[0] : xff; } } }
通常我们的应用之前都部署了反向代理或负载均衡器,remoteAddr获得的只能是代理的 IP 地址,而不是访问用户实际的 IP。这不符合我们的需求,因为反向代理在转发请求时,通常会把用户真实 IP 放入 X-Forwarded-For 这个请求头中。
这种过于依赖 X-Forwarded-For 请求头来判断用户唯一性的实现方式,是有问题的:完全可以通过 cURL 类似的工具来模拟请求,随意篡改头的内容
- 因此,IP 地址或者说请求头里的任何信息,包括 Cookie 中的信息、Referer,只能用作参考,不能用作重要逻辑判断的依据。而对于类似这个案例唯一性的判断需求,更好的做法是,让用户进行登录或三方授权登录(比如微信),拿到用户标识来做唯一性判断。
27.4:用户标识不能从客户端获取
如果希望每一个需要登录的方法,都从 Session 中获得当前用户标识,并进行一些后续处理的话,我们没有必要在每一个方法内都复制粘贴相同的获取用户身份的逻辑,可以定义一个自定义注解 @LoginRequired 到 userId 参数上,然后通过HandlerMethodArgumentResolver 自动实现参数的组装
1 2 3 4 5 6
@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.PARAMETER) @Documented public @interface LoginRequired { String sessionKey() default"currentUser"; }
1 2
@GetMapping("right") public String right(@LoginRequired Long userId) {return"当前用户Id:" + userId;}
魔法来自 HandlerMethodArgumentResolver。我们自定义了一个实现类LoginRequiredArgumentResolver,实现了 HandlerMethodArgumentResolver 接口的2 个方法:
- supportsParameter 方法判断当参数上有 @LoginRequired 注解时,再做自定义参数解析的处理
- resolveArgument 方法用来实现解析逻辑本身。在这里,我们尝试从 Session 中获取当前用户的标识,如果无法获取到的话提示非法调用的错误,如果获取到则返回 userId。这样一来,Controller 中的 userId 参数就可以自动赋值了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
@Slf4j public class LoginRequiredArgumentResolver implements HandlerMethodArgumentResolver { //解析哪些参数 @Override public boolean supportsParameter(MethodParameter methodParameter) { //匹配参数上具有@LoginRequired注解的参数 return methodParameter.hasParameterAnnotation(LoginRequired.class); } @Override public Object resolveArgument(MethodParameter methodParameter, ModelAndViewContainer modelAndViewContainer, NativeWebRequest nativeWebRequest, WebDataBinderFactory webDataBinderFactory) throws Exception { //从参数上获得注解 LoginRequired loginRequired = methodParameter.getParameterAnnotation(LoginRequired.class); //根据注解中的Session Key,从Session中查询用户信息 Object object = nativeWebRequest.getAttribute(loginRequired.sessionKey(), NativeWebRequest.SCOPE_SESSION); if (object == null) { log.error("接口 {} 非法调用!", methodParameter.getMethod().toString()); throw new RuntimeException("请先登录!"); } return object; } }
我们要实现 WebMvcConfigurer 接口的 addArgumentResolvers 方法,来增加这个自定义的处理器 LoginRequiredArgumentResolver
1 2 3 4 5 6 7 8 9 10 11 12
@SpringBootApplication public class CommonMistakesApplication implements WebMvcConfigurer { public static void main(String[] args) { SpringApplication.run(CommonMistakesApplication.class, args); } @Override public void addArgumentResolvers(List<HandlerMethodArgumentResolver> resolvers) { resolvers.add(new LoginRequiredArgumentResolver()); } }
- 测试发现,经过这样的实现,登录后所有需要登录的方法都可以一键通过加@LoginRequired 注解来拿到用户标识,方便且安全
28:安全兜底
- 任何涉及钱的代码必须要考虑防刷、限量和防重,要做好安全兜底。
- 涉及钱的代码,主要有以下三类:
- 第一,代码本身涉及有偿使用的三方服务。如果因为代码本身缺少授权、用量控制而被利用导致大量调用,势必会消耗大量的钱,给公司造成损失。有些三方服务可能采用后付款方式的结算,出现问题后如果没及时发现,下个月结算时就会收到一笔数额巨大的账单
- 第二,代码涉及虚拟资产的发放,比如积分、优惠券等。虽然说虚拟资产不直接对应货币,但一般可以在平台兑换具有真实价值的资产。比如,优惠券可以在下单时使用,积分可以兑换积分商城的商品。所以从某种意义上说,虚拟资产就是具有一定价值的钱,但因为不直接涉及钱和外部资金通道,所以容易产生随意性发放而导致漏洞
- 第三,代码涉及真实钱的进出。比如,对用户扣款,如果出现非正常的多次重复扣款,小则用户投诉、用户流失,大则被相关管理机构要求停业整改,影响业务。又比如,给用户发放返现的付款功能,如果出现漏洞造成重复付款,涉及 B 端的可能还好,但涉及 C 端用户的重复付款可能永远无法追回
28.1:开放平台资源的使用需要考虑防刷
- 短信验证码服务属于开放性服务,由用户侧触发,且因为是注册验证码所以不需要登录就可以使用。如果我们的发短信接口像这样没有任何防刷的防护,直接调用三方短信通道,就相当于“裸奔”,很容易被短信轰炸平台利用
- 对于短信验证码这种开放接口,程序逻辑内需要有防刷逻辑。好的防刷逻辑是,对正常使用的用户毫无影响,只有疑似异常使用的用户才会感受到。对于短信验证码,有如下 4 种可行的方式来防刷
- 第一种方式,只有固定的请求头才能发送验证码
- 我们通过请求头中网页或 App 客户端传给服务端的一些额外参数,来判断请求是不是 App 发起的。其实,这种方式“防君子不防小人”
- 比如,判断是否存在浏览器或手机型号、设备分辨率请求头。对于那些使用爬虫来抓取短信接口地址的程序来说,往往只能抓取到 URL,而难以分析出请求发送短信还需要的额外请求头,可以看作第一道基本防御
- 第二种方式,只有先到过注册页面才能发送验证码
- 对于普通用户来说,不管是通过 App 注册还是 H5 页面注册,一定是先进入注册页面才能看到发送验证码按钮,再点击发送。我们可以在页面或界面打开时请求固定的前置接口,为这个设备开启允许发送验证码的窗口,之后的请求发送验证码才是有效请求
- 这种方式可以防御直接绕开固定流程,通过接口直接调用的发送验证码请求,并不会干扰普通用户
- 第三种方式,控制相同手机号的发送次数和发送频次
- 除非是短信无法收到,否则用户不太会请求了验证码后不完成注册流程,再重新请求。因此,我们可以限制同一手机号每天的最大请求次数。验证码的到达需要时间,太短的发送间隔没有意义,所以我们还可以控制发送的最短间隔。比如,我们可以控制相同手机号一天只能发送 10 次验证码,最短发送间隔 1 分钟
- 第四种方式,增加前置图形验证码
- 短信轰炸平台一般会收集很多免费短信接口,一个接口只会给一个用户发一次短信,所以控制相同手机号发送次数和间隔的方式不够有效。这时,我们可以考虑对用户体验稍微有影响,但也是最有效的方式作为保底,即将弹出图形验证码作为前置
- 除了图形验证码,我们还可以使用其他更友好的人机验证手段(比如滑动、点击验证码等),甚至是引入比较新潮的无感知验证码方案(比如,通过判断用户输入手机号的打字节奏,来判断是用户还是机器),来改善用户体验
- 此外,我们也可以考虑在监测到异常的情况下再弹出人机检测。比如,短时间内大量相同远端 IP 发送验证码的时候,才会触发人机检测
28.2:虚拟资产并不能凭空产生无限使用
- 把优惠券看作一种资源,其生产不是凭空的,而是需要事先申请,理由是:
- 虚拟资产如果最终可以对应到真实金钱上的优惠,那么,能发多少取决于运营和财务的核算,应该是有计划、有上限的。引言提到的无门槛优惠券,需要特别小心。有门槛优惠券的大量使用至少会带来大量真实的消费,而使用无门槛优惠券下的订单,可能用户一分钱都没有支付
- 即使虚拟资产不值钱,大量不合常规的虚拟资产流入市场,也会冲垮虚拟资产的经济体系,造成虚拟货币的极速贬值。有量的控制才有价值
- 资产的申请需要理由,甚至需要走流程,这样才可以追溯是什么活动需要、谁提出的申请,程序依据申请批次来发放
28.3:钱的进出要和订单挂钩并且实现幂等
- 第一,任何资金操作都需要在平台侧生成业务属性的订单,可以是优惠券发放订单,可以是返现订单,也可以是借款订单,一定是先有订单再去做资金操作。同时,订单的产生需要有业务属性。业务属性是指,订单不是凭空产生的,否则就没有控制的意义。比如,返现发放订单必须关联到原先的商品订单产生;再比如,借款订单必须关联到同一个借款合同产生
- 第二,一定要做好防重,也就是实现幂等处理,并且幂等处理必须是全链路的。这里的全链路是指,从前到后都需要有相同的业务订单号来贯穿,实现最终的支付防重
- 对于支付操作,我们一定是调用三方支付公司的接口或银行接口进行处理的。一般而言,这些接口都会有商户订单号的概念,对于相同的商户订单号,无法进行重复的资金处理,所以三方公司的接口可以实现唯一订单号的幂等处理
- 但是,业务系统在实现资金操作时容易犯的错是,没有自始至终地使用一个订单号作为商户订单号,透传给三方支付接口。出现这个问题的原因是,比较大的互联网公司一般会把支付独立一个部门。支付部门可能会针对支付做聚合操作,内部会维护一个支付订单号,然后使用支付订单号和三方支付接口交互。最终虽然商品订单是一个,但支付订单是多个,相同的商品订单因为产生多个支付订单导致多次支付
- 如果说,支付出现了重复扣款,我们可以给用户进行退款操作,但给用户付款的操作一旦出现重复付款,就很难把钱追回来了,所以更要小心
- 这,就是全链路的意义,从一开始就需要先有业务订单产生,然后使用相同的业务订单号一直贯穿到最后的资金通路,才能真正避免重复资金操作
29:数据和代码
- Web 安全方面的很多漏洞,都是源自把数据当成了代码来执行,也就是注入类问题,比如:
- 客户端提供给服务端的查询值,是一个数据,会成为 SQL 查询的一部分。黑客通过修改这个值注入一些 SQL,来达到在服务端运行 SQL 的目的,相当于把查询条件的数据变为了查询代码。这种攻击方式,叫做 SQL 注入。
- 对于规则引擎,我们可能会用动态语言做一些计算,和 SQL 注入一样外部传入的数据只能当做数据使用,如果被黑客利用传入了代码,那么代码可能就会被动态执行。这种攻击方式,叫做代码注入。
- 对于用户注册、留言评论等功能,服务端会从客户端收集一些信息,本来用户名、邮箱这类信息是纯文本信息,但是黑客把信息替换为了 JavaScript 代码。那么,这些信息在页面呈现时,可能就相当于执行了 JavaScript 代码。甚至是,服务端可能把这样的代码,当作普通信息保存到了数据库。黑客通过构建 JavaScript 代码来实现修改页面呈现、盗取信息,甚至蠕虫攻击的方式,叫做 XSS(跨站脚本)攻击。
29.1:SQL注入
- 最经典的 SQL 注入的例子,是通过构造’or’1’=’1 作为密码实现登录。这种简单的攻击方式,在十几年前可以突破很多后台的登录,但现在很难奏效了。
- 最近几年,我们的安全意识增强了,都知道使用参数化查询来避免 SQL 注入问题。其中的原理是,使用参数化查询的话,参数只能作为普通数据,不可能作为 SQL 的一部分,以此有效避免 SQL 注入问题。
- 虽然我们已经开始关注 SQL 注入的问题,但还是有一些认知上的误区,主要表现在以下三个方面:
- 第一,认为 SQL 注入问题只可能发生于 Http Get 请求,也就是通过 URL 传入的参数才可能产生注入点。这是很危险的想法。从注入的难易度上来说,修改 URL 上的QueryString 和修改 Post 请求体中的数据,没有任何区别,因为黑客是通过工具来注入的,而不是通过修改浏览器上的 URL 来注入的。甚至 Cookie 都可以用来 SQL 注入,任何提供数据的地方都可能成为注入点。
- 第二,认为不返回数据的接口,不可能存在注入问题。其实,黑客完全可以利用 SQL 语句构造出一些不正确的 SQL,导致执行出错。如果服务端直接显示了错误信息,那黑客需要的数据就有可能被带出来,从而达到查询数据的目的。甚至是,即使没有详细的出错信息,黑客也可以通过所谓盲注的方式进行攻击。我后面再具体解释。
- 第三,认为 SQL 注入的影响范围,只是通过短路实现突破登录,只需要登录操作加强防范即可。首先,SQL 注入完全可以实现拖库,也就是下载整个数据库的内容(之后我们会演示),SQL 注入的危害不仅仅是突破后台登录。其次,根据木桶原理,整个站点的安全性受限于安全级别最低的那块短板。因此,对于安全问题,站点的所有模块必须一视同仁,并不是只加强防范所谓的重点模块
30:敏感数据处理
- 从安全角度来聊聊用户名、密码、身份证等敏感信息,应该怎么保存和传输。同时,你还可以进一步复习加密算法中的散列、对称加密和非对称加密算法,以及 HTTPS 等相关知识
30.1:怎样保存用户密码
为了防止密码泄露,最重要的原则是不要保存用户密码。你可能会觉得很好笑,不保存用户密码,之后用户登录的时候怎么验证?其实,我指的是不保存原始密码,这样即使拖库也不会泄露用户密码
不要明文保存用户密码,应该把密码通过 MD5 加密后保存。这的确是一个正确的方向,但这个说法并不准确
首先,MD5 其实不是真正的加密算法。所谓加密算法,是可以使用密钥把明文加密为密文,随后还可以使用密钥解密出明文,是双向的
而 MD5 是散列、哈希算法或者摘要算法。不管多长的数据,使用 MD5 运算后得到的都是固定长度的摘要信息或指纹信息,无法再解密为原始数据。所以,MD5 是单向的。最重要的是,仅仅使用 MD5 对密码进行摘要,并不安全
虽然 MD5 不可解密,但是我们可以构建一个超大的数据库,把所有20 位以内的数字和字母组合的密码全部计算一遍 MD5 存进去,需要解密的时候搜索一下MD5 就可以得到原始值了。这就是字典表
目前,有些 MD5 解密网站使用的是彩虹表,是一种使用时间空间平衡的技术,即可以使用更大的空间来降低破解时间,也可以使用更长的破解时间来换取更小的空间
所以直接保存 MD5 后的密码是不安全的。一些同学可能会说,还需要加盐。是的,但是加盐如果不当,还是非常不安全,比较重要的有两点。
- 第一,不能在代码中写死盐,且盐需要有一定的长度;对于这样一串 MD5,虽然破解网站上找不到原始密码,但是黑客可以自己注册一个账号,使用一个简单的密码,比如 1;然后,再去破解网站试一下这个 MD5,就可以得到原始密码是 salt,也就知道了盐值是salt;所以,最好是每一个密码都有独立的盐,并且盐要长一点,比如超过 20 位
- 第二,虽然说每个人的盐最好不同,但我也不建议将一部分用户数据作为盐。比如,使用用户名作为盐;所以,盐最好是随机的值,并且是全球唯一的,意味着全球不可能有现成的彩虹表给你用;正确的做法是,使用全球唯一的、和用户无关的、足够长的随机值作为盐。比如,可以使用UUID 作为盐,把盐一起保存到数据库中;并且每次用户修改密码的时候都重新计算盐,重新保存新的密码
更好的做法是,不要使用像 MD5 这样快速的摘要算法,而是使用慢一点的算法。比如Spring Security 已经废弃了 MessageDigestPasswordEncoder,推荐使用BCryptPasswordEncoder,也就是BCrypt来进行密码哈希。BCrypt 是为保存密码设计的算法,相比 MD5 要慢很多
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
@GetMapping("performance") public void performance() { StopWatch stopWatch = new StopWatch(); String password = "Abcd1234"; stopWatch.start("MD5"); //MD5 DigestUtils.md5Hex(password); stopWatch.stop(); stopWatch.start("BCrypt(10)"); //代价因子为10的BCrypt String hash1 = BCrypt.gensalt(10); BCrypt.hashpw(password, hash1); System.out.println(hash1); stopWatch.stop(); stopWatch.start("BCrypt(12)"); //代价因子为12的BCrypt String hash2 = BCrypt.gensalt(12); BCrypt.hashpw(password, hash2); System.out.println(hash2); stopWatch.stop(); stopWatch.start("BCrypt(14)"); //代价因子为14的BCrypt String hash3 = BCrypt.gensalt(14); BCrypt.hashpw(password, hash3); System.out.println(hash3); stopWatch.stop(); log.info("{}", stopWatch.prettyPrint()); }
可以看到,MD5 只需要 0.8 毫秒,而三次 BCrypt 哈希(代价因子分别设置为 10、12 和14)耗时分别是 82 毫秒、312 毫秒和 1.2 秒
我们写一段代码观察下,BCryptPasswordEncoder 生成的密码哈希的规律
1 2 3 4 5 6 7 8 9 10
@GetMapping("better") public UserData better(@RequestParam(value = "name", defaultValue = "朱晔") String name, @RequestParam(value = "password", defaultValue = "Abcd1234") String password) { UserData userData = new UserData(); userData.setId(1L); userData.setName(name); userData.setPassword(passwordEncoder.encode(password)); userRepository.save(userData); log.info("match ? {}", passwordEncoder.matches(password, userData.getPassword())); return userData; }
我们可以发现三点规律:
第一,我们调用 encode、matches 方法进行哈希、做密码比对的时候,不需要传入盐。BCrypt 把盐作为了算法的一部分,强制我们遵循安全保存密码的最佳实践
第二,生成的盐和哈希后的密码拼在了一起:$是字段分隔符,其中第一个$后的 2a 代表算法版本,第二个$后的 10 是代价因子(默认是 10,代表 2 的 10 次方次哈希),第三个$后的 22 个字符是盐,再后面是摘要。所以说,我们不需要使用单独的数据库字段来保存盐。
1 2 3
"password": "$2a$10$wPWdQwfQO2lMxqSIb6iCROXv7lKnQq5XdMO96iCYCj7boK9pk6QPC" //格式为:$<ver>$<cost>$<salt><digest>- 第三,代价因子的值越大,BCrypt 哈希的耗时越久。因此,对于代价因子的值,更建议的实践是,根据用户的忍耐程度和硬件,设置一个尽可能大的值
- 除了做好密码哈希保存的工作外,我们还要建设一套完善的安全防御机制,在感知到暴力破解危害的时候,开启短信验证、图形验证码、账号暂时锁定等防御机制来抵御暴力破解
30.2:保存姓名和身份证
我们把姓名和身份证,叫做二要素
单向散列算法,显然不适合用来加密保存二要素,因为数据无法解密。这个时候,我们需要选择真正的加密算法。可供选择的算法,包括对称加密和非对称加密算法两类
对称加密算法,是使用相同的密钥进行加密和解密。使用对称加密算法来加密双方的通信的话,双方需要先约定一个密钥,加密方才能加密,接收方才能解密。如果密钥在发送的时候被窃取,那么加密就是白忙一场。因此,这种加密方式的特点是,加密速度比较快,但是密钥传输分发有泄露风险
非对称加密算法,或者叫公钥密码算法。公钥密码是由一对密钥对构成的,使用公钥或者说加密密钥来加密,使用私钥或者说解密密钥来解密,公钥可以任意公开,私钥不能公开。使用非对称加密的话,通信双方可以仅分享公钥用于加密,加密后的数据没有私钥无法解密。因此,这种加密方式的特点是,加密速度比较慢,但是解决了密钥的配送分发安全问题
对于保存敏感信息的场景来说,加密和解密都是我们的服务端程序,不太需要考虑密钥的分发安全性,也就是说使用非对称加密算法没有太大的意义。在这里,我们使用对称加密算法来加密数据
对称加密常用的加密算法,有 DES、3DES 和AES;在业务代码中要避免使用 DES 加密;而 3DES 算法,是使用不同的密钥进行三次 DES 串联调用,虽然解决了 DES 不够安全的问题,但是比 AES 慢,也不太推荐
AES 是当前公认的比较安全,兼顾性能的对称加密算法。不过严格来说,AES 并不是实际的算法名称,而是算法标准
AES 有一个重要的特点就是分组加密体制,一次只能处理 128 位的明文,然后生成 128 位的密文。如果要加密很长的明文,那么就需要迭代处理,而迭代方式就叫做模式。网上很多使用 AES 来加密的代码,使用的是最简单的 ECB 模式(也叫电子密码本模式),其基本结构如下:
这种结构有两个风险:明文和密文是一一对应的,如果明文中有重复的分组,那么密文中可以观察到重复,掌握密文的规律;因为每一个分组是独立加密和解密的 ,如果密文分组的顺序,也可以反过来操纵明文,那么就可以实现不解密密文的情况下,来修改明文
对于敏感数据保存,除了选择 AES+ 合适模式进行加密外,我还推荐以下几个实践:
- 不要在代码中写死一个固定的密钥和初始化向量,最好和之前提到的盐一样,是唯一、独立并且每次都变化的
- 推荐使用独立的加密服务来管控密钥、做加密操作,千万不要把密钥和密文存在一个数据库,加密服务需要设置非常高的管控标准
- 数据库中不能保存明文的敏感信息,但可以保存脱敏的信息。普通查询的时候,直接查脱敏信息即可
第一步,对于用户姓名和身份证,我们分别保存三个信息,脱敏后的明文、密文和加密ID。加密服务加密后返回密文和加密 ID,随后使用加密 ID 来请求加密服务进行解密
1 2 3 4 5 6 7 8 9 10 11 12
@Data @Entity public class UserData { @Id private Long id; private String idcard;//脱敏的身份证 private Long idcardCipherId;//身份证加密ID private String idcardCipherText;//身份证密文 private String name;//脱敏的姓名 private Long nameCipherId;//姓名加密ID private String nameCipherText;//姓名密文 }
第二步,加密服务数据表保存加密 ID、初始化向量和密钥。加密服务表中没有密文,实现了密文和密钥分离保存
1 2 3 4 5 6 7 8 9
@Data @Entity public class CipherData { @Id @GeneratedValue(strategy = AUTO) private Long id; private String iv;//初始化向量 private String secureKey;//密钥 }
第三步,加密服务使用 GCM 模式( Galois/Counter Mode)的 AES-256 对称加密算法,也就是 AES-256-GCM。这是一种AEAD(Authenticated Encryption with Associated Data)认证加密算法,除了能实现普通加密算法提供的保密性之外,还能实现可认证性和密文完整性,是目前最推荐的 AES 模式。
使用类似 GCM 的 AEAD 算法进行加解密,除了需要提供初始化向量和密钥之外,还可以提供一个 AAD(附加认证数据,additional authenticated data),用于验证未包含在明文中的附加信息,解密时不使用加密时的 AAD 将解密失败。其实,GCM 模式的内部使用的就是 CTR 模式,只不过还使用了 GMAC 签名算法,对密文进行签名实现完整性校验
实现基于 AES-256-GCM 的加密服务,包含下面的主要逻辑:
- 加密时允许外部传入一个 AAD 用于认证,加密服务每次都会使用新生成的随机值作为密钥和初始化向量
- 在加密后,加密服务密钥和初始化向量保存到数据库中,返回加密 ID 作为本次加密的标识
- 应用解密时,需要提供加密 ID、密文和加密时的 AAD 来解密。加密服务使用加密 ID,从数据库查询出密钥和初始化向量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
@Service public class CipherService { //密钥长度 public static final int AES_KEY_SIZE = 256; //初始化向量长度 public static final int GCM_IV_LENGTH = 12; //GCM身份认证Tag长度 public static final int GCM_TAG_LENGTH = 16; @Autowired private CipherRepository cipherRepository; //内部加密方法 public static byte[] doEncrypt(byte[] plaintext, SecretKey key, byte[] iv, byte[] aad) throws Exception { //加密算法 Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding"); //Key规范 SecretKeySpec keySpec = new SecretKeySpec(key.getEncoded(), "AES"); //GCM参数规范 GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, iv); //加密模式 cipher.init(Cipher.ENCRYPT_MODE, keySpec, gcmParameterSpec); //设置aad if (aad != null) cipher.updateAAD(aad); //加密 byte[] cipherText = cipher.doFinal(plaintext); return cipherText; } //内部解密方法 public static String doDecrypt(byte[] cipherText, SecretKey key, byte[] iv, byte[] aad) throws Exception { //加密算法 Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding"); //Key规范 SecretKeySpec keySpec = new SecretKeySpec(key.getEncoded(), "AES"); //GCM参数规范 GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(GCM_TAG_LENGTH * 8, iv); //解密模式 cipher.init(Cipher.DECRYPT_MODE, keySpec, gcmParameterSpec); //设置aad if (aad != null) cipher.updateAAD(aad); //解密 byte[] decryptedText = cipher.doFinal(cipherText); return new String(decryptedText); } //加密入口 public CipherResult encrypt(String data, String aad) throws Exception { //加密结果 CipherResult encryptResult = new CipherResult(); //密钥生成器 KeyGenerator keyGenerator = KeyGenerator.getInstance("AES"); //生成密钥 keyGenerator.init(AES_KEY_SIZE); SecretKey key = keyGenerator.generateKey(); //IV数据 byte[] iv = new byte[GCM_IV_LENGTH]; //随机生成IV SecureRandom random = new SecureRandom(); random.nextBytes(iv); //处理aad byte[] aaddata = null; if (!StringUtils.isEmpty(aad)) aaddata = aad.getBytes(); //获得密文 encryptResult.setCipherText(Base64.getEncoder().encodeToString(doEncrypt(data.getBytes(), key, iv, aaddata))); //加密上下文数据 CipherData cipherData = new CipherData(); //保存IV cipherData.setIv(Base64.getEncoder().encodeToString(iv)); //保存密钥 cipherData.setSecureKey(Base64.getEncoder().encodeToString(key.getEncoded())); cipherRepository.save(cipherData); //返回本地加密ID encryptResult.setId(cipherData.getId()); return encryptResult; } //解密入口 public String decrypt(long cipherId, String cipherText, String aad) throws Exception { //使用加密ID找到加密上下文数据 CipherData cipherData = cipherRepository.findById(cipherId).orElseThrow(() -> new IllegalArgumentException("invlaid cipherId")); //加载密钥 byte[] decodedKey = Base64.getDecoder().decode(cipherData.getSecureKey()); //初始化密钥 SecretKey originalKey = new SecretKeySpec(decodedKey, 0, decodedKey.length, "AES"); //加载IV byte[] decodedIv = Base64.getDecoder().decode(cipherData.getIv()); //处理aad byte[] aaddata = null; if (!StringUtils.isEmpty(aad)) aaddata = aad.getBytes(); //解密 return doDecrypt(Base64.getDecoder().decode(cipherText.getBytes()), originalKey, decodedIv, aaddata); } }
第四步,分别实现加密和解密接口用于测试
我们可以让用户选择,如果需要保护二要素的话,就自己输入一个查询密码作为 AAD。系统需要读取用户敏感信息的时候,还需要用户提供这个密码,否则无法解密。这样一来,即使黑客拿到了用户数据库的密文、加密服务的密钥和 IV,也会因为缺少 AAD 无法解密
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
@RestController @Slf4j @RequestMapping("storeidcard") public class StoreIdCardController { private static final String KEY = "secretkey1234567"; private static final String initVector = "abcdefghijklmnop"; @Autowired private UserRepository userRepository; @Autowired private CipherService cipherService; private static SecretKeySpec setKey(String secret) { return new SecretKeySpec(secret.getBytes(), "AES"); } private static String idCard(String idCard) { String num = StringUtils.right(idCard, 4); return StringUtils.leftPad(num, StringUtils.length(idCard), "*"); } public static String chineseName(String chineseName) { String name = StringUtils.left(chineseName, 1); return StringUtils.rightPad(name, StringUtils.length(chineseName), "*"); } private static void test(Cipher cipher, AlgorithmParameterSpec parameterSpec) throws Exception { cipher.init(Cipher.ENCRYPT_MODE, setKey(KEY), parameterSpec); System.out.println("一次:" + Hex.encodeHexString(cipher.doFinal("abcdefghijklmnop".getBytes()))); System.out.println("两次:" + Hex.encodeHexString(cipher.doFinal("abcdefghijklmnopabcdefghijklmnop".getBytes()))); byte[] sender = "1000000000012345".getBytes(); byte[] receiver = "1000000000034567".getBytes(); byte[] money = "0000000010000000".getBytes(); //加密发送方账号 System.out.println("发送方账号:" + Hex.encodeHexString(cipher.doFinal(sender))); //加密接收方账号 System.out.println("接收方账号:" + Hex.encodeHexString(cipher.doFinal(receiver))); //加密金额 System.out.println("金额:" + Hex.encodeHexString(cipher.doFinal(money))); byte[] result = cipher.doFinal(ByteUtils.concatAll(sender, receiver, money)); //加密三个数据 System.out.println("完整数据:" + Hex.encodeHexString(result)); byte[] hack = new byte[result.length]; //把密文前两段交换 System.arraycopy(result, 16, hack, 0, 16); System.arraycopy(result, 0, hack, 16, 16); System.arraycopy(result, 32, hack, 32, 16); cipher.init(Cipher.DECRYPT_MODE, setKey(KEY), parameterSpec); //尝试解密 System.out.println("原始明文:" + new String(ByteUtils.concatAll(sender, receiver, money))); System.out.println("操纵密文:" + new String(cipher.doFinal(hack))); } @GetMapping("wrong") public UserData wrong(@RequestParam(value = "name", defaultValue = "朱晔") String name, @RequestParam(value = "idcard", defaultValue = "300000000000001234") String idCard) { UserData userData = new UserData(); userData.setId(1L); userData.setName(name); userData.setIdcard(idCard); return userRepository.save(userData); } //加密 @GetMapping("right") public UserData right(@RequestParam(value = "name", defaultValue = "朱晔") String name, @RequestParam(value = "idcard", defaultValue = "300000000000001234") String idCard, @RequestParam(value = "aad", required = false) String aad) throws Exception { UserData userData = new UserData(); userData.setId(1L); //脱敏姓名 userData.setName(chineseName(name)); //脱敏身份证 userData.setIdcard(idCard(idCard)); //加密姓名 CipherResult cipherResultName = cipherService.encrypt(name, aad); userData.setNameCipherId(cipherResultName.getId()); userData.setNameCipherText(cipherResultName.getCipherText()); //加密身份证 CipherResult cipherResultIdCard = cipherService.encrypt(idCard, aad); userData.setIdcardCipherId(cipherResultIdCard.getId()); userData.setIdcardCipherText(cipherResultIdCard.getCipherText()); return userRepository.save(userData); } //解密 @GetMapping("read") public void read(@RequestParam(value = "aad", required = false) String aad) throws Exception { UserData userData = userRepository.findById(1L).get(); log.info("name : {} idcard : {}", cipherService.decrypt(userData.getNameCipherId(), userData.getNameCipherText(), aad), cipherService.decrypt(userData.getIdcardCipherId(), userData.getIdcardCipherText(), aad)); }
- 经过这样的设计,二要素就比较安全了。黑客要查询用户二要素的话,需要同时拿到密文、IV+ 密钥、AAD。而这三者可能由三方掌管,要全部拿到比较困难
30.3:了解HTTPS
- HTTP 协议传输数据使用的是明文。那在传输敏感信息的场景下,如果客户端和服务端中间有一个黑客作为中间人拦截请求,就可以窃听到这些数据,还可以修改客户端传过来的数据。这就是很大的安全隐患
- 为解决这个安全隐患,有了 HTTPS 协议。HTTPS=SSL/TLS+HTTP,通过使用一系列加密算法来确保信息安全传输,以实现数据传输的机密性、完整性和权威性
- 机密性:使用非对称加密来加密密钥,然后使用密钥来加密数据,既安全又解决了非对称加密大量数据慢的问题。你可以做一个实验来测试两者的差距
- 完整性:使用散列算法对信息进行摘要,确保信息完整无法被中间人篡改
- 权威性:使用数字证书,来确保我们是在和合法的服务端通信
- HTTPS TLS 1.2 连接(RSA 握手)的整个过程

- 作为准备工作,网站管理员需要申请并安装 CA 证书到服务端。CA 证书中包含非对称加密的公钥、网站域名等信息,密钥是服务端自己保存的,不会在任何地方公开
- 建立 HTTPS 连接的过程,首先是 TCP 握手,然后是 TLS 握手的一系列工作,包括:
- 1:客户端告知服务端自己支持的密码套件(比如TLS_RSA_WITH_AES_256_GCM_SHA384,其中 RSA 是密钥交换的方式,AES_256_GCM 是加密算法,SHA384 是消息验证摘要算法),提供客户端随机数
- 2:服务端应答选择的密码套件,提供服务端随机数
- 3:服务端发送 CA 证书给客户端,客户端验证 CA 证书
- 4:客户端生成 PreMasterKey,并使用非对称加密 + 公钥加密 PreMasterKey
- 5:客户端把加密后的 PreMasterKey 传给服务端
- 6:服务端使用非对称加密 + 私钥解密得到 PreMasterKey,并使用 PreMasterKey+ 两个随机数,生成 MasterKey
- 7:客户端也使用 PreMasterKey+ 两个随机数生成 MasterKey
- 8:客户端告知服务端之后将进行加密传输
- 9:客户端使用 MasterKey 配合对称加密算法,进行对称加密测试
- 10:服务端也使用 MasterKey 配合对称加密算法,进行对称加密测试
- 客户端和服务端的所有通信都是加密通信,并且数据通过签名确保无法篡改。你可能会问,客户端怎么验证 CA 证书呢?其实,CA 证书是一个证书链
- 从服务端拿到的 CA 证书是用户证书,我们需要通过证书中的签发人信息找到上级中间证书,再网上找到根证书
- 根证书只有为数不多的权威机构才能生成,一般预置在 OS 中,根本无法伪造
- 找到根证书后,提取其公钥来验证中间证书的签名,判断其权威性
- 最后再拿到中间证书的公钥,验证用户证书的签名
- 验证了用户证书的合法性,然后再校验其有效期、域名等信息进一步验证有效性
- 总结一下,TLS 通过巧妙的流程和算法搭配解决了传输安全问题:使用对称加密加密数据,使用非对称加密算法确保密钥无法被中间人解密;使用 CA 证书链认证,确保中间人无法伪造自己的证书和公钥
- 如果网站涉及敏感数据的传输,必须使用 HTTPS 协议。作为用户,如果你看到网站不是HTTPS 的或者看到无效证书警告,也不应该继续使用这个网站,以免敏感信息被泄露











