当前位置:Java -> 一个高级复杂数据迁移解决方案

一个高级复杂数据迁移解决方案

在生产系统中,有时需要实现数据迁移来添加新功能。可以使用不同的工具来完成这样的迁移。对于简单的迁移,可以使用SQL。它速度快,而且容易集成到Liquibase或其他数据库迁移管理工具中。这种解决方案适用于无法在SQL脚本中实现的用例。

用例

这个MovieManager项目在数据库中存储了访问TheMovieDB的密钥。为了改进项目,现在应该使用Tink对这些密钥进行加密存储。在数据迁移过程中,需要对现有密钥进行加密,并且在登录过程中需要对新密钥进行加密。电影导入服务需要解密密钥以在导入过程中使用。

数据迁移

更新数据库表

这个Liquibase脚本中,添加了一个"migration"列以标记在“user1”表中已迁移的行:

<changeSet id="41" author="angular2guy">
    <addColumn tableName="user1">
        <column defaultValue="0" type="bigint" name="migration"/>
    </addColumn>
</changeSet>


changeSet"user1"表添加了"migration"列,并设置了默认值"0"

执行数据迁移

数据迁移是通过CronJobs类中的startMigration(...)方法启动的:

...
private static volatile boolean migrationsDone = false;
...
@Scheduled(initialDelay = 2000, fixedRate = 36000000)
@SchedulerLock(name = "Migrations_scheduledTask", lockAtLeastFor = "PT2H", 
  lockAtMostFor = "PT3H")
public void startMigrations() {
  LOG.info("Start migrations.");
  if (!migrationsDone) {
    this.dataMigrationService.encryptUserKeys().thenApplyAsync(result -> {
      LOG.info("Users migrated: {}", result);
      return result;
    });
  }
  migrationsDone = true;
}


startMigrations()方法使用@Scheduled标注,因为这样可以使用@SchedulerLock@SchedulerLock标注设置了一个数据库锁,以限制一次只能执行一个实例,实现水平可扩展性。startMigrations()方法在启动后2秒调用一次,然后每小时调用一次@Scheduled标注。encryptUserKeys()方法返回一个CompletableFuture,可以使用thenApplyAsync(...)无阻塞地记录迁移用户的数量。静态变量migrationsDone确保每个应用实例只调用一次dataMigrationService,使得其他调用基本免费。

数据迁移

要查询UsersJpaUserRepository中有findOpenMigrations方法:

public interface JpaUserRepository extends CrudRepository<User, Long> {
  ...
  @Query("select u from User u where u.migration < :migrationId")
  List<User> findOpenMigrations(@Param(value = "migrationId") 
    Long migrationId);
}


该方法搜索未将迁移属性增加到标记为已迁移的migrationId的实体。

DataMigrationService包含encryptUserKeys()方法进行迁移:

@Service
@Transactional(propagation = Propagation.REQUIRES_NEW)
public class DataMigrationService {
...    
  @Async
  public CompletableFuture<Long> encryptUserKeys() {
    List<User> migratedUsers = this.userRepository.findOpenMigrations(1L)
      .stream().map(myUser -> {          
        myUser.setUuid(Optional.ofNullable(myUser.getUuid())
         .filter(myStr -> !myStr.isBlank())
         .orElse(UUID.randomUUID().toString()));             
        myUser.setMoviedbkey(this.userDetailService
          .encrypt(myUser.getMoviedbkey(), myUser.getUuid()));
        myUser.setMigration(myUser.getMigration() + 1);
        return myUser;
    }).collect(Collectors.toList());
    this.userRepository.saveAll(migratedUsers);
    return CompletableFuture.completedFuture(
      Integer.valueOf(migratedUsers.size()).longValue());
  }
}


该服务的注解中有Propagation.REQUIRES_NEW,以确保每个方法都封装在自己的事务中。

encryptUserKeys()方法中有Async注解,避免调用方出现超时。存储库的findOpenMigrations(...)方法返回未迁移的实体,使用map进行迁移。在map中,首先检查用户的UUID是否已设置,或者已创建并设置。然后使用UserDetailServiceencrypt(...)方法对用户密钥进行加密,并增加migration属性以表示已迁移的实体。迁移的实体被放入列表并与存储库一起保存。然后创建结果CompletableFuture返回迁移的数量。如果迁移已完成,findOpenMigrations(...)会返回一个空集合,不会进行映射或保存。

UserDetailServiceBase在其encrypt()方法中进行加密:

...
@Value("${tink.json.key}")
private String tinkJsonKey;
private DeterministicAead daead;
...
@PostConstruct
public void init() throws GeneralSecurityException {
  DeterministicAeadConfig.register();
  KeysetHandle handle = TinkJsonProtoKeysetFormat.parseKeyset(
    this.tinkJsonKey, InsecureSecretKeyAccess.get());
  this.daead = handle.getPrimitive(DeterministicAead.class);
}
...
public String encrypt(String movieDbKey, String uuid) {
  byte[] cipherBytes;
  try {
    cipherBytes = daead.encryptDeterministically(
       movieDbKey.getBytes(Charset.defaultCharset()),
       uuid.getBytes(Charset.defaultCharset()));
  } catch (GeneralSecurityException e) {
      throw new RuntimeException(e);
  }
  String cipherText = new String(Base64.getEncoder().encode(cipherBytes), 
    Charset.defaultCharset());
  return cipherText;
}


  • tinkJsonKey是一个秘钥,必须以环境变量或Helm chart值的形式注入到应用中以确保安全。
  • init()方法使用@PostConstruct标注,作为初始化运行,并注册配置并使用tinkJsonKey创建KeysetHandle。然后初始化原语。
  • encrypt(...)方法使用encryptDeterministically(...)和方法的参数创建cipherBytes。使用UUID使得每个用户有唯一的cipherBytes。结果以Base64编码形式返回为String

结论:数据迁移

这种迁移需要作为一个应用来运行,而不是作为脚本运行。这个权衡是迁移代码现在在应用中,迁移完成后就可以将其移除,但在现实世界中,这样做的时间是有限的,一段时间后就可能被忘记。另一种选择是使用类似于Spring Batch的工具,但这样做会耗费更多的精力和时间,因为JPA实体/存储库不那么容易重用。在DataMigrationService中加入一个清理方法的TODO应该能解决问题。

必须考虑一个运营约束:在迁移过程中,数据库处于不一致状态,应该停止用户访问应用程序。

最终使用密钥

MovieService 中包含 decrypt(...) 方法:

@Value("${tink.json.key}")
private String tinkJsonKey;
private DeterministicAead daead;
...
@PostConstruct
public void init() throws GeneralSecurityException {
  DeterministicAeadConfig.register();
  KeysetHandle handle = TinkJsonProtoKeysetFormat
    .parseKeyset(this.tinkJsonKey, InsecureSecretKeyAccess.get());
  this.daead = handle.getPrimitive(DeterministicAead.class);
}
...
private String decrypt(String cipherText, String uuid) 
  throws GeneralSecurityException {
  String result = new String(daead.decryptDeterministically(
    Base64.getDecoder().decode(cipherText),
    uuid.getBytes(Charset.defaultCharset())));
  return result;
}


属性和 init() 方法与加密时相同。 decrypt(...) 方法首先对 cipherText 进行 Base64 解码,然后使用结果和 UUID 来解密密钥,并将其作为 String 返回。该密钥字符串与 movieDbRestClient 方法一起用于将电影数据导入数据库。

结论

Tink 库使加密变得非常容易。 tinkJsonKey 必须在运行时注入,并且不应该在存储库文件或应用程序 jar 中。可以使用 EncryptionTestcreateKeySet() 创建 tinkJsonKey。 ShedLock 库实现了横向扩展性,并且 Spring 提供了所使用的工具箱。该解决方案试图在无法在脚本中完成的情况下,平衡用于横向扩展的数据迁移的权衡。

推荐阅读: 字节跳动面经(9)

本文链接: 一个高级复杂数据迁移解决方案