diff --git a/cameleer-server-app/src/main/java/com/cameleer/server/app/license/RetentionPolicyApplier.java b/cameleer-server-app/src/main/java/com/cameleer/server/app/license/RetentionPolicyApplier.java new file mode 100644 index 00000000..33cd4afd --- /dev/null +++ b/cameleer-server-app/src/main/java/com/cameleer/server/app/license/RetentionPolicyApplier.java @@ -0,0 +1,119 @@ +package com.cameleer.server.app.license; + +import com.cameleer.server.core.license.LicenseGate; +import com.cameleer.server.core.license.LicenseLimits; +import com.cameleer.server.core.runtime.Environment; +import com.cameleer.server.core.runtime.EnvironmentRepository; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.event.EventListener; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import java.util.List; + +/** + * Recomputes ClickHouse per-environment TTL on every {@link LicenseChangedEvent}. + * + *

Spec §4.3 — when a license is installed, replaced, or expires, the effective + * retention cap may change. For each (table, env) pair this listener emits one + * {@code ALTER TABLE … MODIFY TTL WHERE environment = ''} statement + * with {@code effective = min(licenseCap, env.configuredRetentionDays)}.

+ * + *

ClickHouse 22.3+ supports per-row TTL via the {@code WHERE} predicate; the + * project's CH version (24.12) is well above that floor. ClickHouse failures are + * logged and swallowed — TTL recompute is best-effort and must not propagate + * to the originating license install/revalidate path.

+ * + *

NOTE: {@code route_diagrams} has no TTL clause in {@code init.sql} — it's a + * {@code ReplacingMergeTree} keyed on content_hash, not a time-series table — + * so it is intentionally excluded here. {@code server_metrics} has no + * {@code environment} column (server-wide) so it is also excluded; its 90-day + * cap is fixed in the schema.

+ */ +@Component +public class RetentionPolicyApplier { + + private static final Logger log = LoggerFactory.getLogger(RetentionPolicyApplier.class); + + /** (table, time column, license cap key, env-configured-days extractor). */ + private record TableSpec(String table, String timeCol, String capKey, Extractor extractor) {} + + @FunctionalInterface + private interface Extractor { + int days(Environment env); + } + + /** + * Tables with a TTL clause AND an {@code environment} column in {@code init.sql}. + * Verified against the schema at task time — keep in sync if new retention-bound + * tables are added. + */ + static final List SPECS = List.of( + new TableSpec("executions", "start_time", "max_execution_retention_days", Environment::executionRetentionDays), + new TableSpec("processor_executions", "start_time", "max_execution_retention_days", Environment::executionRetentionDays), + new TableSpec("logs", "timestamp", "max_log_retention_days", Environment::logRetentionDays), + new TableSpec("agent_metrics", "collected_at", "max_metric_retention_days", Environment::metricRetentionDays), + new TableSpec("agent_events", "timestamp", "max_metric_retention_days", Environment::metricRetentionDays) + ); + + private final LicenseGate gate; + private final EnvironmentRepository envRepo; + private final JdbcTemplate clickhouseJdbc; + + public RetentionPolicyApplier(LicenseGate gate, + EnvironmentRepository envRepo, + @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickhouseJdbc) { + this.gate = gate; + this.envRepo = envRepo; + this.clickhouseJdbc = clickhouseJdbc; + } + + @EventListener(LicenseChangedEvent.class) + @Async + public void onLicenseChanged(LicenseChangedEvent event) { + LicenseLimits limits; + try { + limits = gate.getEffectiveLimits(); + } catch (Exception e) { + log.warn("Skipping TTL recompute — could not read effective limits: {}", e.getMessage()); + return; + } + + List envs; + try { + envs = envRepo.findAll(); + } catch (Exception e) { + log.warn("Skipping TTL recompute — could not load environments: {}", e.getMessage()); + return; + } + + log.info("License changed (state={}) — recomputing TTL across {} environment(s) and {} table(s)", + event.state(), envs.size(), SPECS.size()); + + for (Environment env : envs) { + for (TableSpec spec : SPECS) { + int cap = limits.get(spec.capKey); + int configured = spec.extractor.days(env); + int effective = Math.min(cap, configured); + // Slugs are regex-validated `^[a-z0-9][a-z0-9-]{0,63}$`, so the replacement + // is defense-in-depth — single quotes can never be present. + String envLiteral = env.slug().replace("'", "''"); + String sql = "ALTER TABLE " + spec.table + + " MODIFY TTL toDateTime(" + spec.timeCol + + ") + INTERVAL " + effective + " DAY DELETE" + + " WHERE environment = '" + envLiteral + "'"; + try { + clickhouseJdbc.execute(sql); + log.info("Applied TTL: table={} env={} days={} (cap={}, configured={})", + spec.table, env.slug(), effective, cap, configured); + } catch (Exception e) { + log.warn("Failed to apply TTL for table={} env={}: {}", + spec.table, env.slug(), e.getMessage()); + } + } + } + } +} diff --git a/cameleer-server-app/src/test/java/com/cameleer/server/app/license/RetentionPolicyApplierTest.java b/cameleer-server-app/src/test/java/com/cameleer/server/app/license/RetentionPolicyApplierTest.java new file mode 100644 index 00000000..566755a4 --- /dev/null +++ b/cameleer-server-app/src/test/java/com/cameleer/server/app/license/RetentionPolicyApplierTest.java @@ -0,0 +1,194 @@ +package com.cameleer.server.app.license; + +import com.cameleer.server.core.license.LicenseGate; +import com.cameleer.server.core.license.LicenseInfo; +import com.cameleer.server.core.license.LicenseState; +import com.cameleer.server.core.runtime.Environment; +import com.cameleer.server.core.runtime.EnvironmentRepository; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.time.Instant; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class RetentionPolicyApplierTest { + + EnvironmentRepository envRepo; + JdbcTemplate ch; + LicenseGate gate; + RetentionPolicyApplier applier; + + @BeforeEach + void setUp() { + envRepo = mock(EnvironmentRepository.class); + ch = mock(JdbcTemplate.class); + gate = new LicenseGate(); + applier = new RetentionPolicyApplier(gate, envRepo, ch); + } + + private static Environment env(String slug, int execDays, int logDays, int metricDays) { + return new Environment( + UUID.randomUUID(), slug, slug, false, true, + Map.of(), null, "slate", Instant.now(), + execDays, logDays, metricDays); + } + + private void licenseWithCaps(int execCap, int logCap, int metricCap) { + LicenseInfo info = new LicenseInfo( + UUID.randomUUID(), "default", null, + Map.of( + "max_execution_retention_days", execCap, + "max_log_retention_days", logCap, + "max_metric_retention_days", metricCap + ), + Instant.now(), + Instant.now().plusSeconds(86400), + 0); + gate.load(info); + } + + private static LicenseChangedEvent ev() { + return new LicenseChangedEvent(LicenseState.ACTIVE, null); + } + + @Test + void onChange_emitsAlterPerTablePerEnv() { + licenseWithCaps(30, 30, 30); + Environment dev = env("dev", 30, 30, 30); + when(envRepo.findAll()).thenReturn(List.of(dev)); + + applier.onLicenseChanged(ev()); + + // 5 retention-bound tables defined in SPECS + ArgumentCaptor sql = ArgumentCaptor.forClass(String.class); + verify(ch, times(RetentionPolicyApplier.SPECS.size())).execute(sql.capture()); + + List all = sql.getAllValues(); + // every statement is an ALTER TABLE … MODIFY TTL … WHERE environment = 'dev' + assertThat(all).allSatisfy(s -> { + assertThat(s).startsWith("ALTER TABLE "); + assertThat(s).contains("MODIFY TTL toDateTime("); + assertThat(s).contains(" DAY DELETE"); + assertThat(s).endsWith(" WHERE environment = 'dev'"); + }); + + // Sanity-check the per-table time column wiring + assertThat(findFor(all, "executions")) + .contains("toDateTime(start_time)"); + assertThat(findFor(all, "processor_executions")) + .contains("toDateTime(start_time)"); + assertThat(findFor(all, "logs")) + .contains("toDateTime(timestamp)"); + assertThat(findFor(all, "agent_metrics")) + .contains("toDateTime(collected_at)"); + assertThat(findFor(all, "agent_events")) + .contains("toDateTime(timestamp)"); + } + + @Test + void chFailure_doesNotPropagate() { + licenseWithCaps(30, 30, 30); + when(envRepo.findAll()).thenReturn(List.of(env("dev", 30, 30, 30))); + doThrow(new RuntimeException("ch down")).when(ch).execute(anyString()); + + assertThatCode(() -> applier.onLicenseChanged(ev())) + .doesNotThrowAnyException(); + + // listener still attempted every (env, table) pair despite failures + verify(ch, times(RetentionPolicyApplier.SPECS.size())).execute(anyString()); + } + + @Test + void multipleEnvs_emitsPerEnvAlter() { + licenseWithCaps(30, 30, 30); + when(envRepo.findAll()).thenReturn(List.of( + env("dev", 30, 30, 30), + env("prod", 30, 30, 30) + )); + + applier.onLicenseChanged(ev()); + + ArgumentCaptor sql = ArgumentCaptor.forClass(String.class); + int expected = RetentionPolicyApplier.SPECS.size() * 2; + verify(ch, times(expected)).execute(sql.capture()); + + long devCount = sql.getAllValues().stream() + .filter(s -> s.endsWith("WHERE environment = 'dev'")).count(); + long prodCount = sql.getAllValues().stream() + .filter(s -> s.endsWith("WHERE environment = 'prod'")).count(); + assertThat(devCount).isEqualTo(RetentionPolicyApplier.SPECS.size()); + assertThat(prodCount).isEqualTo(RetentionPolicyApplier.SPECS.size()); + } + + @Test + void effectiveDaysIsMinOfCapAndConfigured_capWins() { + // env wants 30 days but license caps at 7 → expect INTERVAL 7 DAY + licenseWithCaps(7, 7, 7); + when(envRepo.findAll()).thenReturn(List.of(env("dev", 30, 30, 30))); + + applier.onLicenseChanged(ev()); + + ArgumentCaptor sql = ArgumentCaptor.forClass(String.class); + verify(ch, times(RetentionPolicyApplier.SPECS.size())).execute(sql.capture()); + assertThat(sql.getAllValues()) + .allSatisfy(s -> assertThat(s).contains("INTERVAL 7 DAY DELETE")); + } + + @Test + void effectiveDaysIsMinOfCapAndConfigured_configuredWins() { + // env wants 3 days, license allows up to 7 → expect INTERVAL 3 DAY + licenseWithCaps(7, 7, 7); + when(envRepo.findAll()).thenReturn(List.of(env("dev", 3, 3, 3))); + + applier.onLicenseChanged(ev()); + + ArgumentCaptor sql = ArgumentCaptor.forClass(String.class); + verify(ch, times(RetentionPolicyApplier.SPECS.size())).execute(sql.capture()); + assertThat(sql.getAllValues()) + .allSatisfy(s -> assertThat(s).contains("INTERVAL 3 DAY DELETE")); + } + + @Test + void mixedCapAndConfigured_perTable() { + // distinct caps per axis; env exec=10, log=2, metric=50 + // exec : min(20, 10) = 10 + // log : min(20, 2) = 2 + // metric: min(20, 50) = 20 + licenseWithCaps(20, 20, 20); + when(envRepo.findAll()).thenReturn(List.of(env("dev", 10, 2, 50))); + + applier.onLicenseChanged(ev()); + + ArgumentCaptor sql = ArgumentCaptor.forClass(String.class); + verify(ch, times(RetentionPolicyApplier.SPECS.size())).execute(sql.capture()); + List all = sql.getAllValues(); + + assertThat(findFor(all, "executions")).contains("INTERVAL 10 DAY"); + assertThat(findFor(all, "processor_executions")).contains("INTERVAL 10 DAY"); + assertThat(findFor(all, "logs")).contains("INTERVAL 2 DAY"); + assertThat(findFor(all, "agent_metrics")).contains("INTERVAL 20 DAY"); + assertThat(findFor(all, "agent_events")).contains("INTERVAL 20 DAY"); + } + + /** Pick the single SQL statement that targets the given table. */ + private static String findFor(List all, String table) { + String prefix = "ALTER TABLE " + table + " "; + return all.stream() + .filter(s -> s.startsWith(prefix)) + .findFirst() + .orElseThrow(() -> new AssertionError("no SQL for table " + table + " in: " + all)); + } +}