当前位置:Java -> 一个高级复杂数据迁移解决方案
在生产系统中,有时需要实现数据迁移来添加新功能。可以使用不同的工具来完成这样的迁移。对于简单的迁移,可以使用SQL。它速度快,而且容易集成到Liquibase或其他数据库迁移管理工具中。这种解决方案适用于无法在SQL脚本中实现的用例。
这个MovieManager项目在数据库中存储了访问TheMovieDB的密钥。为了改进项目,现在应该使用Tink对这些密钥进行加密存储。在数据迁移过程中,需要对现有密钥进行加密,并且在登录过程中需要对新密钥进行加密。电影导入服务需要解密密钥以在导入过程中使用。
在这个Liquibase脚本中,添加了一个"migration"
列以标记在“user1”
表中已迁移的行:
<changeSet id="41" author="angular2guy"> <addColumn tableName="user1"> <column defaultValue="0" type="bigint" name="migration"/> </addColumn> </changeSet>
changeSet
向"user1"
表添加了"migration"
列,并设置了默认值"0"
。
数据迁移是通过CronJobs
类中的startMigration(...)
方法启动的:
...
private static volatile boolean migrationsDone = false;
...
@Scheduled(initialDelay = 2000, fixedRate = 36000000)
@SchedulerLock(name = "Migrations_scheduledTask", lockAtLeastFor = "PT2H",
lockAtMostFor = "PT3H")
public void startMigrations() {
LOG.info("Start migrations.");
if (!migrationsDone) {
this.dataMigrationService.encryptUserKeys().thenApplyAsync(result -> {
LOG.info("Users migrated: {}", result);
return result;
});
}
migrationsDone = true;
}
startMigrations()
方法使用@Scheduled
标注,因为这样可以使用@SchedulerLock
。@SchedulerLock
标注设置了一个数据库锁,以限制一次只能执行一个实例,实现水平可扩展性。startMigrations()
方法在启动后2秒调用一次,然后每小时调用一次@Scheduled
标注。encryptUserKeys()
方法返回一个CompletableFuture
,可以使用thenApplyAsync(...)
无阻塞地记录迁移用户的数量。静态变量migrationsDone
确保每个应用实例只调用一次dataMigrationService
,使得其他调用基本免费。
要查询Users
,JpaUserRepository
中有findOpenMigrations
方法:
public interface JpaUserRepository extends CrudRepository<User, Long> {
...
@Query("select u from User u where u.migration < :migrationId")
List<User> findOpenMigrations(@Param(value = "migrationId")
Long migrationId);
}
该方法搜索未将迁移属性增加到标记为已迁移的migrationId
的实体。
DataMigrationService
包含encryptUserKeys()
方法进行迁移:
@Service
@Transactional(propagation = Propagation.REQUIRES_NEW)
public class DataMigrationService {
...
@Async
public CompletableFuture<Long> encryptUserKeys() {
List<User> migratedUsers = this.userRepository.findOpenMigrations(1L)
.stream().map(myUser -> {
myUser.setUuid(Optional.ofNullable(myUser.getUuid())
.filter(myStr -> !myStr.isBlank())
.orElse(UUID.randomUUID().toString()));
myUser.setMoviedbkey(this.userDetailService
.encrypt(myUser.getMoviedbkey(), myUser.getUuid()));
myUser.setMigration(myUser.getMigration() + 1);
return myUser;
}).collect(Collectors.toList());
this.userRepository.saveAll(migratedUsers);
return CompletableFuture.completedFuture(
Integer.valueOf(migratedUsers.size()).longValue());
}
}
该服务的注解中有Propagation.REQUIRES_NEW
,以确保每个方法都封装在自己的事务中。
encryptUserKeys()
方法中有Async
注解,避免调用方出现超时。存储库的findOpenMigrations(...)
方法返回未迁移的实体,使用map
进行迁移。在map
中,首先检查用户的UUID
是否已设置,或者已创建并设置。然后使用UserDetailService
的encrypt(...)
方法对用户密钥进行加密,并增加migration
属性以表示已迁移的实体。迁移的实体被放入列表并与存储库一起保存。然后创建结果CompletableFuture
返回迁移的数量。如果迁移已完成,findOpenMigrations(...)
会返回一个空集合,不会进行映射或保存。
UserDetailServiceBase
在其encrypt()
方法中进行加密:
...
@Value("${tink.json.key}")
private String tinkJsonKey;
private DeterministicAead daead;
...
@PostConstruct
public void init() throws GeneralSecurityException {
DeterministicAeadConfig.register();
KeysetHandle handle = TinkJsonProtoKeysetFormat.parseKeyset(
this.tinkJsonKey, InsecureSecretKeyAccess.get());
this.daead = handle.getPrimitive(DeterministicAead.class);
}
...
public String encrypt(String movieDbKey, String uuid) {
byte[] cipherBytes;
try {
cipherBytes = daead.encryptDeterministically(
movieDbKey.getBytes(Charset.defaultCharset()),
uuid.getBytes(Charset.defaultCharset()));
} catch (GeneralSecurityException e) {
throw new RuntimeException(e);
}
String cipherText = new String(Base64.getEncoder().encode(cipherBytes),
Charset.defaultCharset());
return cipherText;
}
tinkJsonKey
是一个秘钥,必须以环境变量或Helm chart值的形式注入到应用中以确保安全。init()
方法使用@PostConstruct
标注,作为初始化运行,并注册配置并使用tinkJsonKey
创建KeysetHandle
。然后初始化原语。encrypt(...)
方法使用encryptDeterministically(...)
和方法的参数创建cipherBytes
。使用UUID
使得每个用户有唯一的cipherBytes
。结果以Base64编码形式返回为String
。这种迁移需要作为一个应用来运行,而不是作为脚本运行。这个权衡是迁移代码现在在应用中,迁移完成后就可以将其移除,但在现实世界中,这样做的时间是有限的,一段时间后就可能被忘记。另一种选择是使用类似于Spring Batch的工具,但这样做会耗费更多的精力和时间,因为JPA实体/存储库不那么容易重用。在DataMigrationService
中加入一个清理方法的TODO应该能解决问题。
必须考虑一个运营约束:在迁移过程中,数据库处于不一致状态,应该停止用户访问应用程序。
MovieService 中包含 decrypt(...)
方法:
@Value("${tink.json.key}")
private String tinkJsonKey;
private DeterministicAead daead;
...
@PostConstruct
public void init() throws GeneralSecurityException {
DeterministicAeadConfig.register();
KeysetHandle handle = TinkJsonProtoKeysetFormat
.parseKeyset(this.tinkJsonKey, InsecureSecretKeyAccess.get());
this.daead = handle.getPrimitive(DeterministicAead.class);
}
...
private String decrypt(String cipherText, String uuid)
throws GeneralSecurityException {
String result = new String(daead.decryptDeterministically(
Base64.getDecoder().decode(cipherText),
uuid.getBytes(Charset.defaultCharset())));
return result;
}
属性和 init()
方法与加密时相同。 decrypt(...)
方法首先对 cipherText
进行 Base64 解码,然后使用结果和 UUID
来解密密钥,并将其作为 String
返回。该密钥字符串与 movieDbRestClient
方法一起用于将电影数据导入数据库。
Tink 库使加密变得非常容易。 tinkJsonKey
必须在运行时注入,并且不应该在存储库文件或应用程序 jar 中。可以使用 EncryptionTest 的 createKeySet()
创建 tinkJsonKey
。 ShedLock 库实现了横向扩展性,并且 Spring 提供了所使用的工具箱。该解决方案试图在无法在脚本中完成的情况下,平衡用于横向扩展的数据迁移的权衡。
推荐阅读: 字节跳动面经(9)
本文链接: 一个高级复杂数据迁移解决方案