feat(license): RetentionPolicyApplier listens on LicenseChangedEvent
@EventListener fires on every license install/replace/expire. For each environment, computes effective TTL = min(licenseCap, env.configured) and emits one ALTER TABLE ... MODIFY TTL ... per (table, env). Tables covered: executions, processor_executions, logs, agent_metrics, agent_events. ClickHouse failures are logged but do not propagate (listener is async-tolerant). route_diagrams is intentionally excluded -- it has no TTL clause in init.sql (ReplacingMergeTree keyed on content_hash, not time-series). server_metrics is also excluded -- it has no environment column (server straddles environments). Per-environment TTL via WHERE requires ClickHouse 22.3+; the project's current image (clickhouse/clickhouse-server:24.12) is well above that floor. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,119 @@
|
||||
package com.cameleer.server.app.license;
|
||||
|
||||
import com.cameleer.server.core.license.LicenseGate;
|
||||
import com.cameleer.server.core.license.LicenseLimits;
|
||||
import com.cameleer.server.core.runtime.Environment;
|
||||
import com.cameleer.server.core.runtime.EnvironmentRepository;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.event.EventListener;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Recomputes ClickHouse per-environment TTL on every {@link LicenseChangedEvent}.
|
||||
*
|
||||
* <p>Spec §4.3 — when a license is installed, replaced, or expires, the effective
|
||||
* retention cap may change. For each (table, env) pair this listener emits one
|
||||
* {@code ALTER TABLE … MODIFY TTL <expr> WHERE environment = '<slug>'} statement
|
||||
* with {@code effective = min(licenseCap, env.configuredRetentionDays)}.</p>
|
||||
*
|
||||
* <p>ClickHouse 22.3+ supports per-row TTL via the {@code WHERE} predicate; the
|
||||
* project's CH version (24.12) is well above that floor. ClickHouse failures are
|
||||
* logged and swallowed — TTL recompute is best-effort and must not propagate
|
||||
* to the originating license install/revalidate path.</p>
|
||||
*
|
||||
* <p>NOTE: {@code route_diagrams} has no TTL clause in {@code init.sql} — it's a
|
||||
* {@code ReplacingMergeTree} keyed on content_hash, not a time-series table —
|
||||
* so it is intentionally excluded here. {@code server_metrics} has no
|
||||
* {@code environment} column (server-wide) so it is also excluded; its 90-day
|
||||
* cap is fixed in the schema.</p>
|
||||
*/
|
||||
@Component
|
||||
public class RetentionPolicyApplier {
|
||||
|
||||
private static final Logger log = LoggerFactory.getLogger(RetentionPolicyApplier.class);
|
||||
|
||||
/** (table, time column, license cap key, env-configured-days extractor). */
|
||||
private record TableSpec(String table, String timeCol, String capKey, Extractor extractor) {}
|
||||
|
||||
@FunctionalInterface
|
||||
private interface Extractor {
|
||||
int days(Environment env);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tables with a TTL clause AND an {@code environment} column in {@code init.sql}.
|
||||
* Verified against the schema at task time — keep in sync if new retention-bound
|
||||
* tables are added.
|
||||
*/
|
||||
static final List<TableSpec> SPECS = List.of(
|
||||
new TableSpec("executions", "start_time", "max_execution_retention_days", Environment::executionRetentionDays),
|
||||
new TableSpec("processor_executions", "start_time", "max_execution_retention_days", Environment::executionRetentionDays),
|
||||
new TableSpec("logs", "timestamp", "max_log_retention_days", Environment::logRetentionDays),
|
||||
new TableSpec("agent_metrics", "collected_at", "max_metric_retention_days", Environment::metricRetentionDays),
|
||||
new TableSpec("agent_events", "timestamp", "max_metric_retention_days", Environment::metricRetentionDays)
|
||||
);
|
||||
|
||||
private final LicenseGate gate;
|
||||
private final EnvironmentRepository envRepo;
|
||||
private final JdbcTemplate clickhouseJdbc;
|
||||
|
||||
public RetentionPolicyApplier(LicenseGate gate,
|
||||
EnvironmentRepository envRepo,
|
||||
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickhouseJdbc) {
|
||||
this.gate = gate;
|
||||
this.envRepo = envRepo;
|
||||
this.clickhouseJdbc = clickhouseJdbc;
|
||||
}
|
||||
|
||||
@EventListener(LicenseChangedEvent.class)
|
||||
@Async
|
||||
public void onLicenseChanged(LicenseChangedEvent event) {
|
||||
LicenseLimits limits;
|
||||
try {
|
||||
limits = gate.getEffectiveLimits();
|
||||
} catch (Exception e) {
|
||||
log.warn("Skipping TTL recompute — could not read effective limits: {}", e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
List<Environment> envs;
|
||||
try {
|
||||
envs = envRepo.findAll();
|
||||
} catch (Exception e) {
|
||||
log.warn("Skipping TTL recompute — could not load environments: {}", e.getMessage());
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("License changed (state={}) — recomputing TTL across {} environment(s) and {} table(s)",
|
||||
event.state(), envs.size(), SPECS.size());
|
||||
|
||||
for (Environment env : envs) {
|
||||
for (TableSpec spec : SPECS) {
|
||||
int cap = limits.get(spec.capKey);
|
||||
int configured = spec.extractor.days(env);
|
||||
int effective = Math.min(cap, configured);
|
||||
// Slugs are regex-validated `^[a-z0-9][a-z0-9-]{0,63}$`, so the replacement
|
||||
// is defense-in-depth — single quotes can never be present.
|
||||
String envLiteral = env.slug().replace("'", "''");
|
||||
String sql = "ALTER TABLE " + spec.table
|
||||
+ " MODIFY TTL toDateTime(" + spec.timeCol
|
||||
+ ") + INTERVAL " + effective + " DAY DELETE"
|
||||
+ " WHERE environment = '" + envLiteral + "'";
|
||||
try {
|
||||
clickhouseJdbc.execute(sql);
|
||||
log.info("Applied TTL: table={} env={} days={} (cap={}, configured={})",
|
||||
spec.table, env.slug(), effective, cap, configured);
|
||||
} catch (Exception e) {
|
||||
log.warn("Failed to apply TTL for table={} env={}: {}",
|
||||
spec.table, env.slug(), e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,194 @@
|
||||
package com.cameleer.server.app.license;
|
||||
|
||||
import com.cameleer.server.core.license.LicenseGate;
|
||||
import com.cameleer.server.core.license.LicenseInfo;
|
||||
import com.cameleer.server.core.license.LicenseState;
|
||||
import com.cameleer.server.core.runtime.Environment;
|
||||
import com.cameleer.server.core.runtime.EnvironmentRepository;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.springframework.jdbc.core.JdbcTemplate;
|
||||
|
||||
import java.time.Instant;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatCode;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.doThrow;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
class RetentionPolicyApplierTest {
|
||||
|
||||
EnvironmentRepository envRepo;
|
||||
JdbcTemplate ch;
|
||||
LicenseGate gate;
|
||||
RetentionPolicyApplier applier;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
envRepo = mock(EnvironmentRepository.class);
|
||||
ch = mock(JdbcTemplate.class);
|
||||
gate = new LicenseGate();
|
||||
applier = new RetentionPolicyApplier(gate, envRepo, ch);
|
||||
}
|
||||
|
||||
private static Environment env(String slug, int execDays, int logDays, int metricDays) {
|
||||
return new Environment(
|
||||
UUID.randomUUID(), slug, slug, false, true,
|
||||
Map.of(), null, "slate", Instant.now(),
|
||||
execDays, logDays, metricDays);
|
||||
}
|
||||
|
||||
private void licenseWithCaps(int execCap, int logCap, int metricCap) {
|
||||
LicenseInfo info = new LicenseInfo(
|
||||
UUID.randomUUID(), "default", null,
|
||||
Map.of(
|
||||
"max_execution_retention_days", execCap,
|
||||
"max_log_retention_days", logCap,
|
||||
"max_metric_retention_days", metricCap
|
||||
),
|
||||
Instant.now(),
|
||||
Instant.now().plusSeconds(86400),
|
||||
0);
|
||||
gate.load(info);
|
||||
}
|
||||
|
||||
private static LicenseChangedEvent ev() {
|
||||
return new LicenseChangedEvent(LicenseState.ACTIVE, null);
|
||||
}
|
||||
|
||||
@Test
|
||||
void onChange_emitsAlterPerTablePerEnv() {
|
||||
licenseWithCaps(30, 30, 30);
|
||||
Environment dev = env("dev", 30, 30, 30);
|
||||
when(envRepo.findAll()).thenReturn(List.of(dev));
|
||||
|
||||
applier.onLicenseChanged(ev());
|
||||
|
||||
// 5 retention-bound tables defined in SPECS
|
||||
ArgumentCaptor<String> sql = ArgumentCaptor.forClass(String.class);
|
||||
verify(ch, times(RetentionPolicyApplier.SPECS.size())).execute(sql.capture());
|
||||
|
||||
List<String> all = sql.getAllValues();
|
||||
// every statement is an ALTER TABLE … MODIFY TTL … WHERE environment = 'dev'
|
||||
assertThat(all).allSatisfy(s -> {
|
||||
assertThat(s).startsWith("ALTER TABLE ");
|
||||
assertThat(s).contains("MODIFY TTL toDateTime(");
|
||||
assertThat(s).contains(" DAY DELETE");
|
||||
assertThat(s).endsWith(" WHERE environment = 'dev'");
|
||||
});
|
||||
|
||||
// Sanity-check the per-table time column wiring
|
||||
assertThat(findFor(all, "executions"))
|
||||
.contains("toDateTime(start_time)");
|
||||
assertThat(findFor(all, "processor_executions"))
|
||||
.contains("toDateTime(start_time)");
|
||||
assertThat(findFor(all, "logs"))
|
||||
.contains("toDateTime(timestamp)");
|
||||
assertThat(findFor(all, "agent_metrics"))
|
||||
.contains("toDateTime(collected_at)");
|
||||
assertThat(findFor(all, "agent_events"))
|
||||
.contains("toDateTime(timestamp)");
|
||||
}
|
||||
|
||||
@Test
|
||||
void chFailure_doesNotPropagate() {
|
||||
licenseWithCaps(30, 30, 30);
|
||||
when(envRepo.findAll()).thenReturn(List.of(env("dev", 30, 30, 30)));
|
||||
doThrow(new RuntimeException("ch down")).when(ch).execute(anyString());
|
||||
|
||||
assertThatCode(() -> applier.onLicenseChanged(ev()))
|
||||
.doesNotThrowAnyException();
|
||||
|
||||
// listener still attempted every (env, table) pair despite failures
|
||||
verify(ch, times(RetentionPolicyApplier.SPECS.size())).execute(anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void multipleEnvs_emitsPerEnvAlter() {
|
||||
licenseWithCaps(30, 30, 30);
|
||||
when(envRepo.findAll()).thenReturn(List.of(
|
||||
env("dev", 30, 30, 30),
|
||||
env("prod", 30, 30, 30)
|
||||
));
|
||||
|
||||
applier.onLicenseChanged(ev());
|
||||
|
||||
ArgumentCaptor<String> sql = ArgumentCaptor.forClass(String.class);
|
||||
int expected = RetentionPolicyApplier.SPECS.size() * 2;
|
||||
verify(ch, times(expected)).execute(sql.capture());
|
||||
|
||||
long devCount = sql.getAllValues().stream()
|
||||
.filter(s -> s.endsWith("WHERE environment = 'dev'")).count();
|
||||
long prodCount = sql.getAllValues().stream()
|
||||
.filter(s -> s.endsWith("WHERE environment = 'prod'")).count();
|
||||
assertThat(devCount).isEqualTo(RetentionPolicyApplier.SPECS.size());
|
||||
assertThat(prodCount).isEqualTo(RetentionPolicyApplier.SPECS.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
void effectiveDaysIsMinOfCapAndConfigured_capWins() {
|
||||
// env wants 30 days but license caps at 7 → expect INTERVAL 7 DAY
|
||||
licenseWithCaps(7, 7, 7);
|
||||
when(envRepo.findAll()).thenReturn(List.of(env("dev", 30, 30, 30)));
|
||||
|
||||
applier.onLicenseChanged(ev());
|
||||
|
||||
ArgumentCaptor<String> sql = ArgumentCaptor.forClass(String.class);
|
||||
verify(ch, times(RetentionPolicyApplier.SPECS.size())).execute(sql.capture());
|
||||
assertThat(sql.getAllValues())
|
||||
.allSatisfy(s -> assertThat(s).contains("INTERVAL 7 DAY DELETE"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void effectiveDaysIsMinOfCapAndConfigured_configuredWins() {
|
||||
// env wants 3 days, license allows up to 7 → expect INTERVAL 3 DAY
|
||||
licenseWithCaps(7, 7, 7);
|
||||
when(envRepo.findAll()).thenReturn(List.of(env("dev", 3, 3, 3)));
|
||||
|
||||
applier.onLicenseChanged(ev());
|
||||
|
||||
ArgumentCaptor<String> sql = ArgumentCaptor.forClass(String.class);
|
||||
verify(ch, times(RetentionPolicyApplier.SPECS.size())).execute(sql.capture());
|
||||
assertThat(sql.getAllValues())
|
||||
.allSatisfy(s -> assertThat(s).contains("INTERVAL 3 DAY DELETE"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void mixedCapAndConfigured_perTable() {
|
||||
// distinct caps per axis; env exec=10, log=2, metric=50
|
||||
// exec : min(20, 10) = 10
|
||||
// log : min(20, 2) = 2
|
||||
// metric: min(20, 50) = 20
|
||||
licenseWithCaps(20, 20, 20);
|
||||
when(envRepo.findAll()).thenReturn(List.of(env("dev", 10, 2, 50)));
|
||||
|
||||
applier.onLicenseChanged(ev());
|
||||
|
||||
ArgumentCaptor<String> sql = ArgumentCaptor.forClass(String.class);
|
||||
verify(ch, times(RetentionPolicyApplier.SPECS.size())).execute(sql.capture());
|
||||
List<String> all = sql.getAllValues();
|
||||
|
||||
assertThat(findFor(all, "executions")).contains("INTERVAL 10 DAY");
|
||||
assertThat(findFor(all, "processor_executions")).contains("INTERVAL 10 DAY");
|
||||
assertThat(findFor(all, "logs")).contains("INTERVAL 2 DAY");
|
||||
assertThat(findFor(all, "agent_metrics")).contains("INTERVAL 20 DAY");
|
||||
assertThat(findFor(all, "agent_events")).contains("INTERVAL 20 DAY");
|
||||
}
|
||||
|
||||
/** Pick the single SQL statement that targets the given table. */
|
||||
private static String findFor(List<String> all, String table) {
|
||||
String prefix = "ALTER TABLE " + table + " ";
|
||||
return all.stream()
|
||||
.filter(s -> s.startsWith(prefix))
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new AssertionError("no SQL for table " + table + " in: " + all));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user