票务服务是整个项目的核心,它承接着其他三个接口。其中最重要的就是余票查询和买票以及订单的取消。
余票查询的特点
必须选择起始地点和到达地点(城市或站点),必须选择一个时间(天)
只有点击查询时,才会携带着起始地点、到达地点、日期发送请求进行查询。
而对于非必选条件,实际上是在前端进行筛选,因为我们知道,列车的数据量是不大的,每次都在后端筛选也不太好,十分占用性能,不如全部都发送给前端,由前端进行筛选。
实际上非必选条件也是根据返回的列车数据中存在的特征分类聚集的。
余票查询
首先要经过一条责任链,我们先对其进行讲解。
首先是参数的校验:参数不能为空、参数的合法:起始时间不能晚于到达时间,出发地和目的地不能相同。
然后是判断出发地和目的地是否存在,其中我们需要访问redis中的一个Hash结构,里面存放了所有的城市和站点。如下:
// 数据加载标识
private static volatile boolean CACHE_DATA_ISNULL_AND_LOAD_FLAG = false;
@Override
public void handler(TicketPageQueryReqDTO requestParam) {
StringRedisTemplate stringRedisTemplate = (StringRedisTemplate) distributedCache.getInstance();
HashOperations<String, Object, Object> hashOperations = stringRedisTemplate.opsForHash();
// 从redis中拿到所有的起始和终点
List<Object> actualExistList = hashOperations.multiGet(
QUERY_ALL_REGION_LIST,
ListUtil.toList(requestParam.getFromStation(), requestParam.getToStation())
);
long emptyCount = actualExistList.stream().filter(Objects::isNull).count();
// 如果两个都存在,直接返回
if (emptyCount == 0L) {
return;
}
// 如果只有一个不存在或者
// 有两个不存在并且缓存数据已经被加载过并且key存在
// 说明站点不存在
if (emptyCount == 1L || (emptyCount == 2L && CACHE_DATA_ISNULL_AND_LOAD_FLAG && distributedCache.hasKey(QUERY_ALL_REGION_LIST))) {
throw new ClientException("出发地或目的地不存在");
}
// 如果说新增了站点,则需要将redis中的QUERY_ALL_REGION_LIST删除
// 如果有两个不存在并且(缓存数据已经被加载过或key存在)
RLock lock = redissonClient.getLock(LOCK_QUERY_ALL_REGION_LIST);
// 双重判定锁去加载数据
lock.lock();
try {
if (distributedCache.hasKey(QUERY_ALL_REGION_LIST)) {
actualExistList = hashOperations.multiGet(
QUERY_ALL_REGION_LIST,
ListUtil.toList(requestParam.getFromStation(), requestParam.getToStation())
);
emptyCount = actualExistList.stream().filter(Objects::nonNull).count();
if (emptyCount != 2L) {
throw new ClientException("出发地或目的地不存在");
}
return;
}
List<RegionDO> regionDOList = regionMapper.selectList(Wrappers.emptyWrapper());
List<StationDO> stationDOList = stationMapper.selectList(Wrappers.emptyWrapper());
HashMap<Object, Object> regionValueMap = Maps.newHashMap();
for (RegionDO each : regionDOList) {
regionValueMap.put(each.getCode(), each.getName());
}
for (StationDO each : stationDOList) {
regionValueMap.put(each.getCode(), each.getName());
}
hashOperations.putAll(QUERY_ALL_REGION_LIST, regionValueMap);
CACHE_DATA_ISNULL_AND_LOAD_FLAG = true;
// 从数据库中的数据进行判断
emptyCount = regionValueMap.keySet().stream()
.filter(each -> StrUtil.equalsAny(each.toString(), requestParam.getFromStation(), requestParam.getToStation()))
.count();
if (emptyCount != 2L) {
throw new ClientException("出发地或目的地不存在");
}
} finally {
lock.unlock();
}
}
接着就是具体的查询业务逻辑。
如上文所诉,我们的查询参数最主要的就是起始地点,到达地点,以及时间(北京,杭州东)。
由于传过来的是站点或城市的Code,所以我们首先通过redis将code转换为中文名称,注意code可能是城市也可能是站点,但最终都要映射到城市的中文名称。为了防止缓存穿透,采用分布式锁和双重判定锁来进行第一次加载。
然后获取所有在这两个地点之间的车次详细信息,从redis中获取,同样使用分布式锁和双重判定锁防止缓存穿透。
对每一个车次,根据列车id,起始站点,到达站点获取座位价格信息(是一个List,因为一个列车由很多类型的座位)
由上一步骤,对于每个座位,计算其余票情况,redis是hash,hashKey是列车id,起始站点,到达站点。k是座位类型,v是数量。如果数量是空则要调用seatMarginCacheLoader组件从数据库中进行加载计算。
public TicketPageQueryRespDTO pageListTicketQueryV1(TicketPageQueryReqDTO requestParam) {
// 责任链模式 验证城市名称是否存在、不存在加载缓存以及出发日期不能小于当前日期等等
ticketPageQueryAbstractChainContext.handler(TicketChainMarkEnum.TRAIN_QUERY_FILTER.name(), requestParam);
StringRedisTemplate stringRedisTemplate = (StringRedisTemplate) distributedCache.getInstance();
// 1.
List<Object> stationDetails = stringRedisTemplate.opsForHash()
.multiGet(REGION_TRAIN_STATION_MAPPING, Lists.newArrayList(requestParam.getFromStation(), requestParam.getToStation()));
long count = stationDetails.stream().filter(Objects::isNull).count();
if (count > 0) {
RLock lock = redissonClient.getLock(LOCK_REGION_TRAIN_STATION_MAPPING);
lock.lock();
try {
stationDetails = stringRedisTemplate.opsForHash()
.multiGet(REGION_TRAIN_STATION_MAPPING, Lists.newArrayList(requestParam.getFromStation(), requestParam.getToStation()));
count = stationDetails.stream().filter(Objects::isNull).count();
if (count > 0) {
Map<String, String> regionTrainStationMap = new HashMap<>();
stationMapper.selectList(Wrappers.emptyWrapper()).forEach(each -> regionTrainStationMap.put(each.getCode(), each.getRegionName()));
stringRedisTemplate.opsForHash().putAll(REGION_TRAIN_STATION_MAPPING, regionTrainStationMap);
stationDetails = new ArrayList<>();
stationDetails.add(regionTrainStationMap.get(requestParam.getFromStation()));
stationDetails.add(regionTrainStationMap.get(requestParam.getToStation()));
}
} finally {
lock.unlock();
}
}
// 2.
List<TicketListDTO> seatResults = new ArrayList<>();
String buildRegionTrainStationHashKey = String.format(REGION_TRAIN_STATION, stationDetails.get(0), stationDetails.get(1));
Map<Object, Object> regionTrainStationAllMap = stringRedisTemplate.opsForHash().entries(buildRegionTrainStationHashKey);
if (MapUtil.isEmpty(regionTrainStationAllMap)) {
RLock lock = redissonClient.getLock(LOCK_REGION_TRAIN_STATION);
lock.lock();
try {
regionTrainStationAllMap = stringRedisTemplate.opsForHash().entries(buildRegionTrainStationHashKey);
if (MapUtil.isEmpty(regionTrainStationAllMap)) {
List<TrainStationRelationDO> trainStationRelationList = trainStationRelationMapper.selectList(new LambdaQueryWrapper<TrainStationRelationDO>()
.eq(TrainStationRelationDO::getStartRegion, stationDetails.get(0))
.eq(TrainStationRelationDO::getEndRegion, stationDetails.get(1)));
for (TrainStationRelationDO each : trainStationRelationList) {
TrainDO trainDO = distributedCache.safeGet(
TRAIN_INFO + each.getTrainId(),
TrainDO.class,
() -> trainMapper.selectById(each.getTrainId()),
ADVANCE_TICKET_DAY,
TimeUnit.DAYS);
TicketListDTO result = new TicketListDTO();
result.setTrainId(String.valueOf(trainDO.getId()));
result.setTrainNumber(trainDO.getTrainNumber());
result.setDepartureTime(convertDateToLocalTime(each.getDepartureTime(), "HH:mm"));
result.setArrivalTime(convertDateToLocalTime(each.getArrivalTime(), "HH:mm"));
result.setDuration(DateUtil.calculateHourDifference(each.getDepartureTime(), each.getArrivalTime()));
result.setDeparture(each.getDeparture());
result.setArrival(each.getArrival());
result.setDepartureFlag(each.getDepartureFlag());
result.setArrivalFlag(each.getArrivalFlag());
result.setTrainType(trainDO.getTrainType());
result.setTrainBrand(trainDO.getTrainBrand());
if (StrUtil.isNotBlank(trainDO.getTrainTag())) {
result.setTrainTags(StrUtil.split(trainDO.getTrainTag(), ","));
}
long betweenDay = cn.hutool.core.date.DateUtil.betweenDay(each.getDepartureTime(), each.getArrivalTime(), false);
result.setDaysArrived((int) betweenDay);
result.setSaleStatus(new Date().after(trainDO.getSaleTime()) ? 0 : 1);
result.setSaleTime(convertDateToLocalTime(trainDO.getSaleTime(), "MM-dd HH:mm"));
seatResults.add(result);
regionTrainStationAllMap.put(CacheUtil.buildKey(String.valueOf(each.getTrainId()), each.getDeparture(), each.getArrival()), JSON.toJSONString(result));
}
stringRedisTemplate.opsForHash().putAll(buildRegionTrainStationHashKey, regionTrainStationAllMap);
}
} finally {
lock.unlock();
}
}
seatResults = CollUtil.isEmpty(seatResults)
? regionTrainStationAllMap.values().stream().map(each -> JSON.parseObject(each.toString(), TicketListDTO.class)).toList()
: seatResults;
seatResults = seatResults.stream().sorted(new TimeStringComparator()).toList();
for (TicketListDTO each : seatResults) {
// 3.
// 获取指定列车、出发站点、到达站点的所有类型座位的价格
String trainStationPriceStr = distributedCache.safeGet(
String.format(TRAIN_STATION_PRICE, each.getTrainId(), each.getDeparture(), each.getArrival()),
String.class,
() -> {
LambdaQueryWrapper<TrainStationPriceDO> trainStationPriceQueryWrapper = Wrappers.lambdaQuery(TrainStationPriceDO.class)
.eq(TrainStationPriceDO::getDeparture, each.getDeparture())
.eq(TrainStationPriceDO::getArrival, each.getArrival())
.eq(TrainStationPriceDO::getTrainId, each.getTrainId());
return JSON.toJSONString(trainStationPriceMapper.selectList(trainStationPriceQueryWrapper));
},
ADVANCE_TICKET_DAY,
TimeUnit.DAYS
);
List<TrainStationPriceDO> trainStationPriceDOList = JSON.parseArray(trainStationPriceStr, TrainStationPriceDO.class);
List<SeatClassDTO> seatClassList = new ArrayList<>();
// 4.
trainStationPriceDOList.forEach(item -> {
String seatType = String.valueOf(item.getSeatType());
String keySuffix = CacheUtil.buildKey(each.getTrainId(), item.getDeparture(), item.getArrival());
Object quantityObj = stringRedisTemplate.opsForHash().get(TRAIN_STATION_REMAINING_TICKET + keySuffix, seatType);
int quantity = Optional.ofNullable(quantityObj)
.map(Object::toString)
.map(Integer::parseInt)
.orElseGet(() -> {
Map<String, String> seatMarginMap = seatMarginCacheLoader.load(String.valueOf(each.getTrainId()), seatType, item.getDeparture(), item.getArrival());
return Optional.ofNullable(seatMarginMap.get(String.valueOf(item.getSeatType()))).map(Integer::parseInt).orElse(0);
});
seatClassList.add(new SeatClassDTO(item.getSeatType(), quantity, new BigDecimal(item.getPrice()).divide(new BigDecimal("100"), 1, RoundingMode.HALF_UP), false));
});
each.setSeatClassList(seatClassList);
}
return TicketPageQueryRespDTO.builder()
.trainList(seatResults)
.departureStationList(buildDepartureStationList(seatResults))
.arrivalStationList(buildArrivalStationList(seatResults))
.trainBrandList(buildTrainBrandList(seatResults))
.seatClassTypeList(buildSeatClassList(seatResults))
.build();
}
购票的特点
购票相对于其他的购物还是有不同的,主要特点如下:
首先可以支持一个用户为其他尚未注册的乘车人购票,一个订单中会存在多个子订单。
其次是在这一个订单中,所有的子订单除了座位类型和车厢有可能不同外(实际上在项目中,一个车厢中的座位类型都是相同的,可以根据车厢判断座位类型),其余的应该都相同(同一天,同一趟列车,同一起点和终点)。
购票实际上分为两部分,提交订单和付款。提交订单10分钟如果没有付款,订单会自动关闭。在这十分钟内,该座位是处于锁定状态(对查询来说是已售出,如果订单关闭就会将状态改回未售出状态)
购票策略
购票是存在一定的策略的,我们知道12306存在选座服务,如果可以,12306会优先帮助我们选择我们指定的座位,但是如果不满足就会自动分配席位。下面就讲解一下在项目中的自动分配策略。
订单中可以包含不同座位类型的子订单,我们下面的自动分配策略就针对一种座位类型来讲解,如果存在多种座位类型也是类似的。
对于多人购票,我们查找的条件是:相邻座位,相同车厢。如果无法满足相邻座位,则设法满足相同车厢,如果无法满足相同车厢,那么就随机分配。
购票
购票存在两个接口,V2对V1的互斥逻辑进行了优化,减小了锁粒度,增加了令牌桶,增大了并发量,隔绝了大部分无效请求。但是实际的购票流程是相同的。
Controller
在Controller层存在幂等检查,具体使用如下:
@ILog
@Idempotent(
uniqueKeyPrefix = "railway-ticket:lock_purchase-tickets:",
key = "T(com.zzys.railway.framework.starter.user.core.UserContext).getUsername()",
message = "正在执行下单流程,请稍后...",
scene = IdempotentSceneEnum.RESTAPI,
type = IdempotentTypeEnum.SPEL
)
@PostMapping("/api/ticket-service/ticket/purchase")
public Result<TicketPurchaseRespDTO> purchaseTickets(@RequestBody PurchaseTicketReqDTO requestParam) {
return Results.success(ticketService.purchaseTicketsV1(requestParam));
}
我们在这里使用的是幂等组件库中的基于SPEL的REST场景下的幂等处理,本质上是以用户名为标识加上分布式锁,防止重复购票。
责任链
V1和V2都会通过一条责任链:
首先是参数的判空校验,正确性校验(列车是否存在,购票时间是否正确,车站是否存在车次中,以及车站的顺序是否正确)。
然后就是判断余票是否充足,如果用户提交多个乘车人非同一座位类型,拆分验证。一个订单中的子订单只有座位类型可能不同,其他都相同。
假设下面是一个订单,那么每个子项都是一个子订单
- 列车1-北京南-南京南-车厢1-商务座-座位1
- 列车1-北京南-南京南-车厢1-商务座-座位2
- 列车1-北京南-南京南-车厢2-二等座-座位1
上面的情况就会查询以下的Redis缓存,hash的key是:列车1-北京南-南京南
- 商务座:2
- 二等座:1
V1
V1很暴力,每当一个订单在创建时,我们就对整个列车加分布式锁,也就是说,每辆列车同时只能有一个人选座。
public TicketPurchaseRespDTO purchaseTicketsV1(PurchaseTicketReqDTO requestParam) {
// 责任链模式,验证 1:参数必填 2:参数正确性 3:乘客是否已买当前车次等...
purchaseTicketAbstractChainContext.handler(TicketChainMarkEnum.TRAIN_PURCHASE_TICKET_FILTER.name(), requestParam);
String lockKey = String.format(LOCK_PURCHASE_TICKETS, requestParam.getTrainId());
RLock lock = redissonClient.getLock(lockKey);
lock.lock();
try {
return ticketService.executePurchaseTickets(requestParam);
} finally {
lock.unlock();
}
}
V2
锁粒度的改变
如果有一人选择的是二等座,另一个人选择同一辆车的商务座,我们发现其实这两个请求是可以并发执行的。所以说,我们不应该对整个列车加锁,而是对座位类型加锁。
这里又引出了死锁的问题,我们破坏环路等待条件,在加锁时,都按照座位类型按序获取(0-商务座,1-一等座,2-二等座)。这样可以避免死锁。
令牌桶
我们可以发现,假设train1-北京南-南京南-二等座一共就只有2张票,但是在V1版本下,100个请求过来买这类票,我们仍然需要对当前列车获取100次分布式锁,其实其中的98次分布式锁的获取是不必须的。我们可以利用限流算法,对无效请求进行隔离。
在这里我们使用的是类似令牌桶的算法,但是我们的令牌是不会增加的,每天会刷新令牌桶中的余票。
令牌桶实际上是Redis中的Hash结构,每一趟列车都是一个Hash。在其中包含着这一条路线中的所有逻辑路线和座位类型的笛卡尔积的余票数量。
我们使用lua脚本对每次取出一个逻辑线路的指定座位类型的影响进行改变,比较绕,举个例子:
假设我们买了列车2的从南京南到杭州东的商务座,原始bucket如下所示:
那么之后我们不仅要对南京南_杭州东_0
的余票减一,对南京南_上海虹桥_0
的余票也要减一,我们使用lua脚本保证执行的原子性。
public boolean takeTokenFromBucket(PurchaseTicketReqDTO requestParam) {
// 前面这一部分主要是对令牌桶的获取
// 以及当令牌桶不存在时,采用分布式双重判定锁初始化的步骤
TrainDO trainDO = distributedCache.safeGet(
TRAIN_INFO + requestParam.getTrainId(),
TrainDO.class,
() -> trainMapper.selectById(requestParam.getTrainId()),
ADVANCE_TICKET_DAY,
TimeUnit.DAYS);
List<RouteDTO> routeDTOList = trainStationService
.listTrainStationRoute(requestParam.getTrainId(), trainDO.getStartStation(), trainDO.getEndStation());
StringRedisTemplate stringRedisTemplate = (StringRedisTemplate) distributedCache.getInstance();
String actualHashKey = TICKET_AVAILABILITY_TOKEN_BUCKET + requestParam.getTrainId();
Boolean hasKey = distributedCache.hasKey(actualHashKey);
if (!hasKey) {
RLock lock = redissonClient.getLock(String.format(LOCK_TICKET_AVAILABILITY_TOKEN_BUCKET, requestParam.getTrainId()));
lock.lock();
try {
Boolean hasKeyTwo = distributedCache.hasKey(actualHashKey);
if (!hasKeyTwo) {
List<Integer> seatTypes = VehicleTypeEnum.findSeatTypesByCode(trainDO.getTrainType());
Map<String, String> ticketAvailabilityTokenMap = new HashMap<>();
for (RouteDTO each : routeDTOList) {
List<SeatTypeCountDTO> seatTypeCountDTOList = seatMapper.listSeatTypeCount(Long.parseLong(requestParam.getTrainId()), each.getStartStation(), each.getEndStation(), seatTypes);
for (SeatTypeCountDTO eachSeatTypeCountDTO : seatTypeCountDTOList) {
String buildCacheKey = CacheUtil.buildKey(each.getStartStation(), each.getEndStation(), String.valueOf(eachSeatTypeCountDTO.getSeatType()));
ticketAvailabilityTokenMap.put(buildCacheKey, String.valueOf(eachSeatTypeCountDTO.getSeatCount()));
}
}
stringRedisTemplate.opsForHash().putAll(TICKET_AVAILABILITY_TOKEN_BUCKET + requestParam.getTrainId(), ticketAvailabilityTokenMap);
}
} finally {
lock.unlock();
}
}
// 获取lua脚本
DefaultRedisScript<Long> actual = getLuaScript(LUA_TICKET_AVAILABILITY_TOKEN_BUCKET_PATH);
// [座位类型: 座位数量] [{0:1},{1:2}]
Map<Integer, Long> seatTypeCountMap = requestParam.getPassengers().stream()
.collect(Collectors.groupingBy(PurchaseTicketPassengerDetailDTO::getSeatType, Collectors.counting()));
// 将其转换为JSONArray形式
JSONArray seatTypeCountArray = Map2JSONArray(seatTypeCountMap);
// 获取到起始站到终点站所经过的所有逻辑路线
List<RouteDTO> takeoutRouteDTOList = trainStationService
.listTrainStationRoute(requestParam.getTrainId(), requestParam.getDeparture(), requestParam.getArrival());
String luaScriptKey = CacheUtil.buildKey(requestParam.getDeparture(), requestParam.getArrival());
// 调用lua脚本 KEY:令牌桶id,HashKey的前缀(北京南_南京南) ARGV: 座位数量集合,沿途逻辑路线
Long result = stringRedisTemplate.execute(actual, Lists.newArrayList(actualHashKey, luaScriptKey), JSON.toJSONString(seatTypeCountArray), JSON.toJSONString(takeoutRouteDTOList));
// 成功返回0
return result != null && Objects.equals(result, 0L);
}
下面是lua脚本逻辑,注释写的很详细。
-- Lua脚本,用于检查和更新票务系统中的座位可用性
-- 从Redis命令参数(KEYS数组)中提取第二个键 北京南_南京南
local inputString = KEYS[2]
-- 初始化处理键的变量
local actualKey = inputString
local colonIndex = string.find(actualKey, ":")
-- 检查键是否包含冒号,如果是,则提取冒号后的子串
if colonIndex ~= nil then
actualKey = string.sub(actualKey, colonIndex + 1)
end
-- 解码作为第一个参数传递的JSON数组(ARGV数组) [0(商务座):1(数量),1:2]
local jsonArrayStr = ARGV[1]
local jsonArray = cjson.decode(jsonArrayStr)
-- 遍历JSON数组以检查座位的可用性
for index, jsonObj in ipairs(jsonArray) do
local seatType = tonumber(jsonObj.seatType)
local count = tonumber(jsonObj.count)
-- 北京南_南京南_0
local actualInnerHashKey = actualKey .. "_" .. seatType
-- 从Redis哈希中获取当前座位可用性令牌值,KEYS[1]是当前列车的令牌桶key
local ticketSeatAvailabilityTokenValue = tonumber(redis.call('hget', KEYS[1], tostring(actualInnerHashKey)))
-- 检查可用座位是否少于请求的数量
if ticketSeatAvailabilityTokenValue < count then
return 1 -- 表示座位不足
end
end
-- 解码作为第二个参数传递的第二个JSON数组(ARGV数组)[北京南_南京南_0,南京南_上海虹桥_0]
local alongJsonArrayStr = ARGV[2]
local alongJsonArray = cjson.decode(alongJsonArrayStr)
-- 遍历第一个JSON数组并在Redis哈希中更新座位的可用性
for index, jsonObj in ipairs(jsonArray) do
local seatType = tonumber(jsonObj.seatType)
local count = tonumber(jsonObj.count)
-- 遍历第二个JSON数组以获取起始和终点站
for indexTwo, alongJsonObj in ipairs(alongJsonArray) do
local startStation = tostring(alongJsonObj.startStation)
local endStation = tostring(alongJsonObj.endStation)
local actualInnerHashKey = startStation .. "_" .. endStation .. "_" .. seatType
-- 根据起始、终点站和座位类型在Redis哈希中减少座位计数
redis.call('hincrby', KEYS[1], tostring(actualInnerHashKey), -count)
end
end
-- 返回0表示成功检查和更新座位的可用性
return 0
本地锁
在分布式以及高并发的环境下,我们会发现每一次的有效请求都会向redis发起一次获取分布式锁的请求,我们可以让本地的线程先去争夺本地的锁,本地上锁成功后再去获取分布式锁。
我们可以类别,如果一个年纪有1000个人,有十个班,那么每个班就有100个人。当前要评审奖项,如果每个人都直接向年纪长发送请求,年纪长可能会忙不过来。所以我们可以让一个班的先去班级长处发送请求,在班级长这里获得通过后,再向年纪长发送请求,有点像缓存的概念。
但是这里又引出了死锁的问题,我们还是可以破坏环路等待的条件,对要获取的锁进行排序,首先每次先获取本地锁再获取分布式锁。
接着我们思考本地怎么存放锁,如果使用ConcurrentHashMap,我们会发现一旦我们删除列车数据或删除座位类型,我们没有很好的解决办法去删除对应的锁,最终可能占用内存过大或oom。所以我们采用了本地缓存组件Caffeine,设置过期时间为一天。
其实这个优化是不必须的,因为我们使用了令牌桶,有效的请求不会太多,这样Redis作为性能数据库,应该还是能够抗住这波削减后的流量的。
End
下面就是运用到以上三点后实现的V2版本,首先会去令牌桶获取令牌,隔绝大部分请求,接着就是对多把锁的获取,包括双重判定锁来初始化本地锁(有点绕),采用的是类对象来加锁
public TicketPurchaseRespDTO purchaseTicketsV2(PurchaseTicketReqDTO requestParam) { purchaseTicketAbstractChainContext.handler(TicketChainMarkEnum.TRAIN_PURCHASE_TICKET_FILTER.name(), requestParam);
boolean tokenResult = ticketAvailabilityTokenBucket.takeTokenFromBucket(requestParam);
if (!tokenResult) {
throw new ServiceException("列车站点已无余票");
}
List<ReentrantLock> localLockList = new ArrayList<>();
List<RLock> distributedLockList = new ArrayList<>();
Map<Integer, List<PurchaseTicketPassengerDetailDTO>> seatTypeMap = requestParam.getPassengers().stream()
.collect(Collectors.groupingBy(PurchaseTicketPassengerDetailDTO::getSeatType));
List<Map.Entry<Integer, List<PurchaseTicketPassengerDetailDTO>>> entryList = new ArrayList<>(seatTypeMap.entrySet());
entryList.sort(Map.Entry.comparingByKey());
entryList.forEach((entry) -> {
Integer searType = entry.getKey();
String lockKey = environment.resolvePlaceholders(String.format(LOCK_PURCHASE_TICKETS_V2, requestParam.getTrainId(), searType));
ReentrantLock localLock = localLockMap.getIfPresent(lockKey);
if (localLock == null) {
synchronized (TicketService.class) {
if ((localLock = localLockMap.getIfPresent(lockKey)) == null) {
localLock = new ReentrantLock(true);
localLockMap.put(lockKey, localLock);
}
}
}
localLockList.add(localLock);
RLock distributedLock = redissonClient.getFairLock(lockKey);
distributedLockList.add(distributedLock);
});
try {
localLockList.forEach(ReentrantLock::lock);
distributedLockList.forEach(RLock::lock);
return ticketService.executePurchaseTickets(requestParam);
} finally {
localLockList.forEach(localLock -> {
try {
localLock.unlock();
} catch (Throwable ignored) {
}
});
distributedLockList.forEach(distributedLock -> {
try {
distributedLock.unlock();
} catch (Throwable ignored) {
}
});
}
}
购票流程
executePurchaseTickets
方法就是具体的购票流程,V1和V2版本都是相同的,这里需要注意不能直接调用方法,否则会事务失效,需要注入。
由于代码大多数都是装填对象,我就直接说一下思路:
- 首先通过TrainSeatTypeSelector#select方法自动分配座位(TrainSeatTypeSelector是自定义的座位选择器),同时对座位上锁。
- 创建票务对象,批量插入数据库。
- RPC调用Order服务。
TrainSeatTypeSelector
我们这里来说一下座位选择器,我们只了解它的选择运行架构,而不去真正的学习它的选择底层实现(代码太长了)。
如果一个订单中有超过两种座位类型选票,那么我们会采用线程池去计算不同座位类型的选座(选座算法的时间复杂度较高,同时也存在多次的数据库读),同时采用并行流获取数据。
分配完毕座位后,我们会去远程调用一下用户服务,判断每个乘车人都已经在数据库中登记过并且审核过。
我们对座位进行锁定。
对于第一点,我们的更详细的操作步骤是:
调用选座算法进行选座
扣减余票缓存,但是我们需要先判断一下缓存更新的类型,如果项目采用的是监听binlog+RocketMQ,那么我们就无需手动扣减,我们扣减的就是下图的Redis结构: