From a188308ec530f4426976f9985e9f91021f555ba7 Mon Sep 17 00:00:00 2001 From: hsiegeln <37154749+hsiegeln@users.noreply.github.com> Date: Sat, 4 Apr 2026 15:00:18 +0200 Subject: [PATCH] feat: implement multitenancy with tenant isolation + environment support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds configurable tenant ID (CAMELEER_TENANT_ID env var, default: "default") and environment as a first-class concept. Each server instance serves one tenant with multiple environments. Changes across 36 files: - TenantProperties config bean for tenant ID injection - AgentInfo: added environmentId field - AgentRegistrationRequest: added environmentId field - All 9 ClickHouse stores: inject tenant ID, replace hardcoded "default" constant, add environment to writes/reads - ChunkAccumulator: configurable tenant ID + environment resolver - MergedExecution/ProcessorBatch/BufferedLogEntry: added environment - ClickHouse init.sql: added environment column to all tables, updated ORDER BY (tenant→time→env→app), added tenant_id to usage_events, updated all MV GROUP BY clauses - Controllers: pass environmentId through registration/auto-heal - K8s deploy: added CAMELEER_TENANT_ID env var - All tests updated for new signatures Closes #123 Co-Authored-By: Claude Opus 4.6 (1M context) --- .../server/app/config/StorageBeanConfig.java | 41 +++++++++---- .../server/app/config/TenantProperties.java | 19 +++++++ .../AgentRegistrationController.java | 6 +- .../app/controller/AgentSseController.java | 2 +- .../controller/LogIngestionController.java | 15 ++++- .../app/dto/AgentRegistrationRequest.java | 1 + .../server/app/search/ClickHouseLogStore.java | 57 ++++++++++--------- .../app/search/ClickHouseSearchIndex.java | 23 +++++--- .../ClickHouseAgentEventRepository.java | 10 ++-- .../app/storage/ClickHouseDiagramStore.java | 18 +++--- .../app/storage/ClickHouseExecutionStore.java | 29 +++++----- .../storage/ClickHouseMetricsQueryStore.java | 8 ++- .../app/storage/ClickHouseMetricsStore.java | 9 ++- .../app/storage/ClickHouseStatsStore.java | 34 +++++------ .../app/storage/ClickHouseUsageTracker.java | 19 ++++--- .../src/main/resources/application.yml | 2 + .../src/main/resources/clickhouse/init.sql | 52 +++++++++++------ .../server/app/TestSecurityHelper.java | 2 +- .../app/search/ClickHouseLogStoreIT.java | 2 +- .../app/search/ClickHouseSearchIndexIT.java | 10 ++-- .../ClickHouseAgentEventRepositoryIT.java | 2 +- .../storage/ClickHouseChunkPipelineIT.java | 7 ++- .../app/storage/ClickHouseDiagramStoreIT.java | 2 +- .../storage/ClickHouseExecutionReadIT.java | 4 +- .../storage/ClickHouseExecutionStoreIT.java | 8 +-- .../ClickHouseMetricsQueryStoreIT.java | 2 +- .../app/storage/ClickHouseMetricsStoreIT.java | 2 +- .../app/storage/ClickHouseStatsStoreIT.java | 2 +- .../server/core/agent/AgentInfo.java | 18 +++--- .../core/agent/AgentRegistryService.java | 10 ++-- .../core/ingestion/BufferedLogEntry.java | 2 + .../core/ingestion/ChunkAccumulator.java | 23 ++++++-- .../core/ingestion/MergedExecution.java | 1 + .../core/agent/AgentRegistryServiceTest.java | 48 ++++++++-------- .../core/ingestion/ChunkAccumulatorTest.java | 6 +- deploy/base/server.yaml | 2 + 36 files changed, 310 insertions(+), 188 deletions(-) create mode 100644 cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/TenantProperties.java diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java index 913637ff..9a440224 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/StorageBeanConfig.java @@ -10,6 +10,8 @@ import com.cameleer3.server.app.storage.ClickHouseStatsStore; import com.cameleer3.server.core.admin.AuditRepository; import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.agent.AgentEventRepository; +import com.cameleer3.server.core.agent.AgentInfo; +import com.cameleer3.server.core.agent.AgentRegistryService; import com.cameleer3.server.core.detail.DetailService; import com.cameleer3.server.core.indexing.SearchIndexer; import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler; @@ -63,34 +65,45 @@ public class StorageBeanConfig { @Bean public MetricsStore clickHouseMetricsStore( + TenantProperties tenantProperties, @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { - return new ClickHouseMetricsStore(clickHouseJdbc); + return new ClickHouseMetricsStore(tenantProperties.getId(), clickHouseJdbc); } @Bean public MetricsQueryStore clickHouseMetricsQueryStore( + TenantProperties tenantProperties, @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { - return new ClickHouseMetricsQueryStore(clickHouseJdbc); + return new ClickHouseMetricsQueryStore(tenantProperties.getId(), clickHouseJdbc); } // ── Execution Store ────────────────────────────────────────────────── @Bean public ClickHouseExecutionStore clickHouseExecutionStore( + TenantProperties tenantProperties, @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { - return new ClickHouseExecutionStore(clickHouseJdbc); + return new ClickHouseExecutionStore(tenantProperties.getId(), clickHouseJdbc); } @Bean public ChunkAccumulator chunkAccumulator( + TenantProperties tenantProperties, WriteBuffer executionBuffer, WriteBuffer processorBatchBuffer, - DiagramStore diagramStore) { + DiagramStore diagramStore, + AgentRegistryService registryService) { return new ChunkAccumulator( + tenantProperties.getId(), executionBuffer::offerOrWarn, processorBatchBuffer::offerOrWarn, diagramStore, - java.time.Duration.ofMinutes(5)); + java.time.Duration.ofMinutes(5), + instanceId -> { + AgentInfo agent = registryService.findById(instanceId); + return agent != null && agent.environmentId() != null + ? agent.environmentId() : "default"; + }); } @Bean @@ -108,40 +121,45 @@ public class StorageBeanConfig { @Bean public SearchIndex clickHouseSearchIndex( + TenantProperties tenantProperties, @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { - return new ClickHouseSearchIndex(clickHouseJdbc); + return new ClickHouseSearchIndex(tenantProperties.getId(), clickHouseJdbc); } // ── ClickHouse Stats Store ───────────────────────────────────────── @Bean public StatsStore clickHouseStatsStore( + TenantProperties tenantProperties, @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { - return new ClickHouseStatsStore(clickHouseJdbc); + return new ClickHouseStatsStore(tenantProperties.getId(), clickHouseJdbc); } // ── ClickHouse Diagram Store ────────────────────────────────────── @Bean public DiagramStore clickHouseDiagramStore( + TenantProperties tenantProperties, @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { - return new ClickHouseDiagramStore(clickHouseJdbc); + return new ClickHouseDiagramStore(tenantProperties.getId(), clickHouseJdbc); } // ── ClickHouse Agent Event Repository ───────────────────────────── @Bean public AgentEventRepository clickHouseAgentEventRepository( + TenantProperties tenantProperties, @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { - return new ClickHouseAgentEventRepository(clickHouseJdbc); + return new ClickHouseAgentEventRepository(tenantProperties.getId(), clickHouseJdbc); } // ── ClickHouse Log Store ────────────────────────────────────────── @Bean public ClickHouseLogStore clickHouseLogStore( + TenantProperties tenantProperties, @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { - return new ClickHouseLogStore(clickHouseJdbc); + return new ClickHouseLogStore(tenantProperties.getId(), clickHouseJdbc); } // ── Usage Analytics ────────────────────────────────────────────── @@ -149,8 +167,9 @@ public class StorageBeanConfig { @Bean @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") public ClickHouseUsageTracker clickHouseUsageTracker( + TenantProperties tenantProperties, @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { - return new ClickHouseUsageTracker(clickHouseJdbc, + return new ClickHouseUsageTracker(tenantProperties.getId(), clickHouseJdbc, new com.cameleer3.server.core.ingestion.WriteBuffer<>(5000)); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/TenantProperties.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/TenantProperties.java new file mode 100644 index 00000000..d4e8d617 --- /dev/null +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/config/TenantProperties.java @@ -0,0 +1,19 @@ +package com.cameleer3.server.app.config; + +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Component +@ConfigurationProperties(prefix = "cameleer.tenant") +public class TenantProperties { + + private String id = "default"; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } +} diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java index ce586702..4cf67c2b 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentRegistrationController.java @@ -115,11 +115,13 @@ public class AgentRegistrationController { } String application = request.applicationId() != null ? request.applicationId() : "default"; + String environmentId = request.environmentId() != null ? request.environmentId() : "default"; List routeIds = request.routeIds() != null ? request.routeIds() : List.of(); var capabilities = request.capabilities() != null ? request.capabilities() : Collections.emptyMap(); AgentInfo agent = registryService.register( - request.instanceId(), request.displayName(), application, request.version(), routeIds, capabilities); + request.instanceId(), request.displayName(), application, environmentId, + request.version(), routeIds, capabilities); log.info("Agent registered: {} (name={}, application={})", request.instanceId(), request.displayName(), application); agentEventService.recordEvent(request.instanceId(), application, "REGISTERED", @@ -210,7 +212,7 @@ public class AgentRegistrationController { if (jwtResult != null) { String application = jwtResult.application() != null ? jwtResult.application() : "default"; Map caps = capabilities != null ? capabilities : Map.of(); - registryService.register(id, id, application, "unknown", + registryService.register(id, id, application, "default", "unknown", List.of(), caps); registryService.heartbeat(id); log.info("Auto-registered agent {} (app={}) from heartbeat after server restart", id, application); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentSseController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentSseController.java index 4c568a7e..548674cb 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentSseController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/AgentSseController.java @@ -67,7 +67,7 @@ public class AgentSseController { JwtAuthenticationFilter.JWT_RESULT_ATTR); if (jwtResult != null) { String application = jwtResult.application() != null ? jwtResult.application() : "default"; - registryService.register(id, id, application, "unknown", List.of(), Map.of()); + registryService.register(id, id, application, "default", "unknown", List.of(), Map.of()); log.info("Auto-registered agent {} (app={}) from SSE connect after server restart", id, application); } else { throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found: " + id); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java index 94092d9a..8b9b9b7d 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/controller/LogIngestionController.java @@ -8,6 +8,7 @@ import com.cameleer3.server.core.agent.AgentRegistryService; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.tags.Tag; +import com.cameleer3.server.app.config.TenantProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.ResponseEntity; @@ -27,11 +28,14 @@ public class LogIngestionController { private final WriteBuffer logBuffer; private final AgentRegistryService registryService; + private final TenantProperties tenantProperties; public LogIngestionController(WriteBuffer logBuffer, - AgentRegistryService registryService) { + AgentRegistryService registryService, + TenantProperties tenantProperties) { this.logBuffer = logBuffer; this.registryService = registryService; + this.tenantProperties = tenantProperties; } @PostMapping("/logs") @@ -44,8 +48,10 @@ public class LogIngestionController { if (batch.getEntries() != null && !batch.getEntries().isEmpty()) { log.debug("Received {} log entries from instance={}, app={}", batch.getEntries().size(), instanceId, applicationId); + String environment = resolveEnvironment(instanceId); for (var entry : batch.getEntries()) { - logBuffer.offerOrWarn(new BufferedLogEntry(instanceId, applicationId, entry)); + logBuffer.offerOrWarn(new BufferedLogEntry( + tenantProperties.getId(), environment, instanceId, applicationId, entry)); } } @@ -61,4 +67,9 @@ public class LogIngestionController { AgentInfo agent = registryService.findById(instanceId); return agent != null ? agent.applicationId() : ""; } + + private String resolveEnvironment(String instanceId) { + AgentInfo agent = registryService.findById(instanceId); + return agent != null && agent.environmentId() != null ? agent.environmentId() : "default"; + } } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/AgentRegistrationRequest.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/AgentRegistrationRequest.java index 88fe83f8..f2d0cc71 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/AgentRegistrationRequest.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/dto/AgentRegistrationRequest.java @@ -11,6 +11,7 @@ public record AgentRegistrationRequest( @NotNull String instanceId, @NotNull String displayName, @Schema(defaultValue = "default") String applicationId, + @Schema(defaultValue = "default") String environmentId, String version, List routeIds, Map capabilities diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java index 5353c5e0..68ca0392 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseLogStore.java @@ -29,12 +29,13 @@ import java.util.Map; public class ClickHouseLogStore implements LogIndex { private static final Logger log = LoggerFactory.getLogger(ClickHouseLogStore.class); - private static final String TENANT = "default"; private static final DateTimeFormatter ISO_FMT = DateTimeFormatter.ISO_INSTANT; + private final String tenantId; private final JdbcTemplate jdbc; - public ClickHouseLogStore(JdbcTemplate jdbc) { + public ClickHouseLogStore(String tenantId, JdbcTemplate jdbc) { + this.tenantId = tenantId; this.jdbc = jdbc; } @@ -46,23 +47,24 @@ public class ClickHouseLogStore implements LogIndex { String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " + "logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + - "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> { Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now(); - ps.setTimestamp(1, Timestamp.from(ts)); - ps.setString(2, applicationId); - ps.setString(3, instanceId); - ps.setString(4, entry.getLevel() != null ? entry.getLevel() : ""); - ps.setString(5, entry.getLoggerName() != null ? entry.getLoggerName() : ""); - ps.setString(6, entry.getMessage() != null ? entry.getMessage() : ""); - ps.setString(7, entry.getThreadName() != null ? entry.getThreadName() : ""); - ps.setString(8, entry.getStackTrace() != null ? entry.getStackTrace() : ""); + ps.setString(1, tenantId); + ps.setTimestamp(2, Timestamp.from(ts)); + ps.setString(3, applicationId); + ps.setString(4, instanceId); + ps.setString(5, entry.getLevel() != null ? entry.getLevel() : ""); + ps.setString(6, entry.getLoggerName() != null ? entry.getLoggerName() : ""); + ps.setString(7, entry.getMessage() != null ? entry.getMessage() : ""); + ps.setString(8, entry.getThreadName() != null ? entry.getThreadName() : ""); + ps.setString(9, entry.getStackTrace() != null ? entry.getStackTrace() : ""); Map mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap(); String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); - ps.setString(9, exchangeId); - ps.setObject(10, mdc); + ps.setString(10, exchangeId); + ps.setObject(11, mdc); }); log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId); @@ -71,26 +73,28 @@ public class ClickHouseLogStore implements LogIndex { public void insertBufferedBatch(List entries) { if (entries.isEmpty()) return; - String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " + + String sql = "INSERT INTO logs (tenant_id, environment, timestamp, application, instance_id, level, " + "logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + - "VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; jdbc.batchUpdate(sql, entries, entries.size(), (ps, ble) -> { LogEntry entry = ble.entry(); Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now(); - ps.setTimestamp(1, Timestamp.from(ts)); - ps.setString(2, ble.applicationId()); - ps.setString(3, ble.instanceId()); - ps.setString(4, entry.getLevel() != null ? entry.getLevel() : ""); - ps.setString(5, entry.getLoggerName() != null ? entry.getLoggerName() : ""); - ps.setString(6, entry.getMessage() != null ? entry.getMessage() : ""); - ps.setString(7, entry.getThreadName() != null ? entry.getThreadName() : ""); - ps.setString(8, entry.getStackTrace() != null ? entry.getStackTrace() : ""); + ps.setString(1, ble.tenantId() != null ? ble.tenantId() : tenantId); + ps.setString(2, ble.environment() != null ? ble.environment() : "default"); + ps.setTimestamp(3, Timestamp.from(ts)); + ps.setString(4, ble.applicationId()); + ps.setString(5, ble.instanceId()); + ps.setString(6, entry.getLevel() != null ? entry.getLevel() : ""); + ps.setString(7, entry.getLoggerName() != null ? entry.getLoggerName() : ""); + ps.setString(8, entry.getMessage() != null ? entry.getMessage() : ""); + ps.setString(9, entry.getThreadName() != null ? entry.getThreadName() : ""); + ps.setString(10, entry.getStackTrace() != null ? entry.getStackTrace() : ""); Map mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap(); String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); - ps.setString(9, exchangeId); - ps.setObject(10, mdc); + ps.setString(11, exchangeId); + ps.setObject(12, mdc); }); log.debug("Flushed {} buffered log entries to ClickHouse", entries.size()); @@ -101,7 +105,8 @@ public class ClickHouseLogStore implements LogIndex { // Build shared WHERE conditions (used by both data and count queries) List baseConditions = new ArrayList<>(); List baseParams = new ArrayList<>(); - baseConditions.add("tenant_id = 'default'"); + baseConditions.add("tenant_id = ?"); + baseParams.add(tenantId); if (request.application() != null && !request.application().isEmpty()) { baseConditions.add("application = ?"); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java index a64dc3f5..bf630401 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/search/ClickHouseSearchIndex.java @@ -44,9 +44,11 @@ public class ClickHouseSearchIndex implements SearchIndex { "applicationId", "application_id" ); + private final String tenantId; private final JdbcTemplate jdbc; - public ClickHouseSearchIndex(JdbcTemplate jdbc) { + public ClickHouseSearchIndex(String tenantId, JdbcTemplate jdbc) { + this.tenantId = tenantId; this.jdbc = jdbc; } @@ -118,7 +120,8 @@ public class ClickHouseSearchIndex implements SearchIndex { private String buildWhereClause(SearchRequest request, List params) { List conditions = new ArrayList<>(); - conditions.add("tenant_id = 'default'"); + conditions.add("tenant_id = ?"); + params.add(tenantId); if (request.timeFrom() != null) { conditions.add("start_time >= ?"); @@ -186,11 +189,12 @@ public class ClickHouseSearchIndex implements SearchIndex { conditions.add("(execution_id = ? OR correlation_id = ? OR exchange_id = ?" + " OR _search_text LIKE ? OR execution_id IN (" + "SELECT DISTINCT execution_id FROM processor_executions " - + "WHERE tenant_id = 'default' AND _search_text LIKE ?))"); + + "WHERE tenant_id = ? AND _search_text LIKE ?))"); params.add(term); params.add(term); params.add(term); params.add(likeTerm); + params.add(tenantId); params.add(likeTerm); } @@ -199,7 +203,8 @@ public class ClickHouseSearchIndex implements SearchIndex { String likeTerm = "%" + escapeLike(request.textInBody()) + "%"; conditions.add("execution_id IN (" + "SELECT DISTINCT execution_id FROM processor_executions " - + "WHERE tenant_id = 'default' AND (input_body LIKE ? OR output_body LIKE ?))"); + + "WHERE tenant_id = ? AND (input_body LIKE ? OR output_body LIKE ?))"); + params.add(tenantId); params.add(likeTerm); params.add(likeTerm); } @@ -209,7 +214,8 @@ public class ClickHouseSearchIndex implements SearchIndex { String likeTerm = "%" + escapeLike(request.textInHeaders()) + "%"; conditions.add("execution_id IN (" + "SELECT DISTINCT execution_id FROM processor_executions " - + "WHERE tenant_id = 'default' AND (input_headers LIKE ? OR output_headers LIKE ?))"); + + "WHERE tenant_id = ? AND (input_headers LIKE ? OR output_headers LIKE ?))"); + params.add(tenantId); params.add(likeTerm); params.add(likeTerm); } @@ -219,9 +225,10 @@ public class ClickHouseSearchIndex implements SearchIndex { String likeTerm = "%" + escapeLike(request.textInErrors()) + "%"; conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ? OR execution_id IN (" + "SELECT DISTINCT execution_id FROM processor_executions " - + "WHERE tenant_id = 'default' AND (error_message LIKE ? OR error_stacktrace LIKE ?)))"); + + "WHERE tenant_id = ? AND (error_message LIKE ? OR error_stacktrace LIKE ?)))"); params.add(likeTerm); params.add(likeTerm); + params.add(tenantId); params.add(likeTerm); params.add(likeTerm); } @@ -311,9 +318,9 @@ public class ClickHouseSearchIndex implements SearchIndex { return jdbc.queryForList(""" SELECT DISTINCT arrayJoin(JSONExtractKeys(attributes)) AS attr_key FROM executions FINAL - WHERE tenant_id = 'default' AND attributes != '' AND attributes != '{}' + WHERE tenant_id = ? AND attributes != '' AND attributes != '{}' ORDER BY attr_key - """, String.class); + """, String.class, tenantId); } catch (Exception e) { log.error("Failed to query distinct attribute keys", e); return List.of(); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepository.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepository.java index 84b0c41f..6c69d7e1 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepository.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepository.java @@ -17,30 +17,30 @@ import java.util.List; */ public class ClickHouseAgentEventRepository implements AgentEventRepository { - private static final String TENANT = "default"; - private static final String INSERT_SQL = "INSERT INTO agent_events (tenant_id, instance_id, application_id, event_type, detail) VALUES (?, ?, ?, ?, ?)"; private static final String SELECT_BASE = "SELECT 0 AS id, instance_id, application_id, event_type, detail, timestamp FROM agent_events WHERE tenant_id = ?"; + private final String tenantId; private final JdbcTemplate jdbc; - public ClickHouseAgentEventRepository(JdbcTemplate jdbc) { + public ClickHouseAgentEventRepository(String tenantId, JdbcTemplate jdbc) { + this.tenantId = tenantId; this.jdbc = jdbc; } @Override public void insert(String instanceId, String applicationId, String eventType, String detail) { - jdbc.update(INSERT_SQL, TENANT, instanceId, applicationId, eventType, detail); + jdbc.update(INSERT_SQL, tenantId, instanceId, applicationId, eventType, detail); } @Override public List query(String applicationId, String instanceId, Instant from, Instant to, int limit) { var sql = new StringBuilder(SELECT_BASE); var params = new ArrayList(); - params.add(TENANT); + params.add(tenantId); if (applicationId != null) { sql.append(" AND application_id = ?"); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java index 97c47dad..41fb0bdb 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseDiagramStore.java @@ -39,8 +39,6 @@ public class ClickHouseDiagramStore implements DiagramStore { private static final Logger log = LoggerFactory.getLogger(ClickHouseDiagramStore.class); - private static final String TENANT = "default"; - private static final String INSERT_SQL = """ INSERT INTO route_diagrams (tenant_id, content_hash, route_id, instance_id, application_id, definition, created_at) @@ -64,6 +62,7 @@ public class ClickHouseDiagramStore implements DiagramStore { WHERE tenant_id = ? AND application_id = ? """; + private final String tenantId; private final JdbcTemplate jdbc; private final ObjectMapper objectMapper; @@ -72,7 +71,8 @@ public class ClickHouseDiagramStore implements DiagramStore { // contentHash → deserialized RouteGraph private final ConcurrentHashMap graphCache = new ConcurrentHashMap<>(); - public ClickHouseDiagramStore(JdbcTemplate jdbc) { + public ClickHouseDiagramStore(String tenantId, JdbcTemplate jdbc) { + this.tenantId = tenantId; this.jdbc = jdbc; this.objectMapper = new ObjectMapper(); this.objectMapper.registerModule(new JavaTimeModule()); @@ -87,7 +87,7 @@ public class ClickHouseDiagramStore implements DiagramStore { String key = rs.getString("route_id") + "\0" + rs.getString("instance_id"); hashCache.put(key, rs.getString("content_hash")); }, - TENANT); + tenantId); log.info("Diagram hash cache warmed: {} entries", hashCache.size()); } catch (Exception e) { log.warn("Failed to warm diagram hash cache — lookups will fall back to ClickHouse: {}", e.getMessage()); @@ -109,7 +109,7 @@ public class ClickHouseDiagramStore implements DiagramStore { String routeId = graph.getRouteId() != null ? graph.getRouteId() : ""; jdbc.update(INSERT_SQL, - TENANT, + tenantId, contentHash, routeId, agentId, @@ -134,7 +134,7 @@ public class ClickHouseDiagramStore implements DiagramStore { return Optional.of(cached); } - List> rows = jdbc.queryForList(SELECT_BY_HASH, TENANT, contentHash); + List> rows = jdbc.queryForList(SELECT_BY_HASH, tenantId, contentHash); if (rows.isEmpty()) { return Optional.empty(); } @@ -157,7 +157,7 @@ public class ClickHouseDiagramStore implements DiagramStore { } List> rows = jdbc.queryForList( - SELECT_HASH_FOR_ROUTE, TENANT, routeId, agentId); + SELECT_HASH_FOR_ROUTE, tenantId, routeId, agentId); if (rows.isEmpty()) { return Optional.empty(); } @@ -186,7 +186,7 @@ public class ClickHouseDiagramStore implements DiagramStore { "WHERE tenant_id = ? AND route_id = ? AND instance_id IN (" + placeholders + ") " + "ORDER BY created_at DESC LIMIT 1"; var params = new ArrayList(); - params.add(TENANT); + params.add(tenantId); params.add(routeId); params.addAll(agentIds); List> rows = jdbc.queryForList(sql, params.toArray()); @@ -200,7 +200,7 @@ public class ClickHouseDiagramStore implements DiagramStore { public Map findProcessorRouteMapping(String applicationId) { Map mapping = new HashMap<>(); List> rows = jdbc.queryForList( - SELECT_DEFINITIONS_FOR_APP, TENANT, applicationId); + SELECT_DEFINITIONS_FOR_APP, tenantId, applicationId); for (Map row : rows) { String routeId = (String) row.get("route_id"); String json = (String) row.get("definition"); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java index 061f2597..d2e2b12d 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseExecutionStore.java @@ -18,14 +18,16 @@ import java.util.Optional; public class ClickHouseExecutionStore implements ExecutionStore { + private final String tenantId; private final JdbcTemplate jdbc; private final ObjectMapper objectMapper; - public ClickHouseExecutionStore(JdbcTemplate jdbc) { - this(jdbc, new ObjectMapper()); + public ClickHouseExecutionStore(String tenantId, JdbcTemplate jdbc) { + this(tenantId, jdbc, new ObjectMapper()); } - public ClickHouseExecutionStore(JdbcTemplate jdbc, ObjectMapper objectMapper) { + public ClickHouseExecutionStore(String tenantId, JdbcTemplate jdbc, ObjectMapper objectMapper) { + this.tenantId = tenantId; this.jdbc = jdbc; this.objectMapper = objectMapper; } @@ -36,14 +38,14 @@ public class ClickHouseExecutionStore implements ExecutionStore { jdbc.batchUpdate(""" INSERT INTO executions ( tenant_id, _version, execution_id, route_id, instance_id, application_id, - status, correlation_id, exchange_id, start_time, end_time, duration_ms, + environment, status, correlation_id, exchange_id, start_time, end_time, duration_ms, error_message, error_stacktrace, error_type, error_category, root_cause_type, root_cause_message, diagram_content_hash, engine_level, input_body, output_body, input_headers, output_headers, attributes, trace_id, span_id, has_trace_data, is_replay, original_exchange_id, replay_exchange_id ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, executions.stream().map(e -> new Object[]{ nullToEmpty(e.tenantId()), @@ -52,6 +54,7 @@ public class ClickHouseExecutionStore implements ExecutionStore { nullToEmpty(e.routeId()), nullToEmpty(e.instanceId()), nullToEmpty(e.applicationId()), + nullToEmpty(e.environment()), nullToEmpty(e.status()), nullToEmpty(e.correlationId()), nullToEmpty(e.exchangeId()), @@ -199,11 +202,11 @@ public class ClickHouseExecutionStore implements ExecutionStore { error_type, error_category, root_cause_type, root_cause_message, trace_id, span_id, has_trace_data, is_replay FROM executions FINAL - WHERE tenant_id = 'default' AND execution_id = ? + WHERE tenant_id = ? AND execution_id = ? LIMIT 1 """, (rs, rowNum) -> mapExecutionRecord(rs), - executionId); + tenantId, executionId); return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } @@ -219,11 +222,11 @@ public class ClickHouseExecutionStore implements ExecutionStore { resolved_endpoint_uri, circuit_breaker_state, fallback_triggered, filter_matched, duplicate_message FROM processor_executions - WHERE tenant_id = 'default' AND execution_id = ? + WHERE tenant_id = ? AND execution_id = ? ORDER BY seq """, (rs, rowNum) -> mapProcessorRecord(rs), - executionId); + tenantId, executionId); } @Override @@ -238,11 +241,11 @@ public class ClickHouseExecutionStore implements ExecutionStore { resolved_endpoint_uri, circuit_breaker_state, fallback_triggered, filter_matched, duplicate_message FROM processor_executions - WHERE tenant_id = 'default' AND execution_id = ? AND processor_id = ? + WHERE tenant_id = ? AND execution_id = ? AND processor_id = ? LIMIT 1 """, (rs, rowNum) -> mapProcessorRecord(rs), - executionId, processorId); + tenantId, executionId, processorId); return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } @@ -258,11 +261,11 @@ public class ClickHouseExecutionStore implements ExecutionStore { resolved_endpoint_uri, circuit_breaker_state, fallback_triggered, filter_matched, duplicate_message FROM processor_executions - WHERE tenant_id = 'default' AND execution_id = ? AND seq = ? + WHERE tenant_id = ? AND execution_id = ? AND seq = ? LIMIT 1 """, (rs, rowNum) -> mapProcessorRecord(rs), - executionId, seq); + tenantId, executionId, seq); return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java index 6ecc1174..ac22f724 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStore.java @@ -13,9 +13,11 @@ import java.util.Map; public class ClickHouseMetricsQueryStore implements MetricsQueryStore { + private final String tenantId; private final JdbcTemplate jdbc; - public ClickHouseMetricsQueryStore(JdbcTemplate jdbc) { + public ClickHouseMetricsQueryStore(String tenantId, JdbcTemplate jdbc) { + this.tenantId = tenantId; this.jdbc = jdbc; } @@ -41,7 +43,8 @@ public class ClickHouseMetricsQueryStore implements MetricsQueryStore { metric_name, avg(metric_value) AS avg_value FROM agent_metrics - WHERE instance_id = ? + WHERE tenant_id = ? + AND instance_id = ? AND collected_at >= ? AND collected_at < ? AND metric_name IN (%s) @@ -50,6 +53,7 @@ public class ClickHouseMetricsQueryStore implements MetricsQueryStore { """.formatted(intervalSeconds, placeholders); List params = new ArrayList<>(); + params.add(tenantId); params.add(instanceId); params.add(java.sql.Timestamp.from(from)); params.add(java.sql.Timestamp.from(to)); diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java index ebbd941c..48b271dc 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseMetricsStore.java @@ -11,9 +11,11 @@ import java.util.Map; public class ClickHouseMetricsStore implements MetricsStore { + private final String tenantId; private final JdbcTemplate jdbc; - public ClickHouseMetricsStore(JdbcTemplate jdbc) { + public ClickHouseMetricsStore(String tenantId, JdbcTemplate jdbc) { + this.tenantId = tenantId; this.jdbc = jdbc; } @@ -22,10 +24,11 @@ public class ClickHouseMetricsStore implements MetricsStore { if (snapshots.isEmpty()) return; jdbc.batchUpdate(""" - INSERT INTO agent_metrics (instance_id, metric_name, metric_value, tags, collected_at) - VALUES (?, ?, ?, ?, ?) + INSERT INTO agent_metrics (tenant_id, instance_id, metric_name, metric_value, tags, collected_at) + VALUES (?, ?, ?, ?, ?, ?) """, snapshots.stream().map(s -> new Object[]{ + tenantId, s.instanceId(), s.metricName(), s.metricValue(), diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java index f6a95561..8a317f3d 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseStatsStore.java @@ -31,11 +31,11 @@ import java.util.Map; */ public class ClickHouseStatsStore implements StatsStore { - private static final String TENANT = "default"; - + private final String tenantId; private final JdbcTemplate jdbc; - public ClickHouseStatsStore(JdbcTemplate jdbc) { + public ClickHouseStatsStore(String tenantId, JdbcTemplate jdbc) { + this.tenantId = tenantId; this.jdbc = jdbc; } @@ -117,7 +117,7 @@ public class ClickHouseStatsStore implements StatsStore { List params = new ArrayList<>(); params.add(thresholdMs); - params.add(TENANT); + params.add(tenantId); params.add(Timestamp.from(from)); params.add(Timestamp.from(to)); if (applicationId != null) { @@ -149,7 +149,7 @@ public class ClickHouseStatsStore implements StatsStore { jdbc.query(sql, (rs) -> { result.put(rs.getString("application_id"), new long[]{rs.getLong("compliant"), rs.getLong("total")}); - }, defaultThresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to)); + }, defaultThresholdMs, tenantId, Timestamp.from(from), Timestamp.from(to)); return result; } @@ -167,7 +167,7 @@ public class ClickHouseStatsStore implements StatsStore { jdbc.query(sql, (rs) -> { result.put(rs.getString("route_id"), new long[]{rs.getLong("compliant"), rs.getLong("total")}); - }, thresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to), applicationId); + }, thresholdMs, tenantId, Timestamp.from(from), Timestamp.from(to), applicationId); return result; } @@ -223,13 +223,13 @@ public class ClickHouseStatsStore implements StatsStore { "ORDER BY c.cnt DESC"; List fullParams = new ArrayList<>(); - fullParams.add(TENANT); + fullParams.add(tenantId); fullParams.addAll(params); fullParams.add(limit); fullParams.add(Timestamp.from(fiveMinAgo)); fullParams.add(Timestamp.from(tenMinAgo)); fullParams.add(Timestamp.from(fiveMinAgo)); - fullParams.add(TENANT); + fullParams.add(tenantId); fullParams.addAll(params); return jdbc.query(sql, (rs, rowNum) -> { @@ -253,7 +253,7 @@ public class ClickHouseStatsStore implements StatsStore { "WHERE tenant_id = ? AND status = 'FAILED' AND start_time >= ? AND start_time < ?"; List params = new ArrayList<>(); - params.add(TENANT); + params.add(tenantId); params.add(Timestamp.from(from)); params.add(Timestamp.from(to)); if (applicationId != null) { @@ -275,7 +275,7 @@ public class ClickHouseStatsStore implements StatsStore { "countMerge(total_count) AS total_count, " + "countIfMerge(failed_count) AS failed_count " + "FROM " + view + - " WHERE tenant_id = " + lit(TENANT) + + " WHERE tenant_id = " + lit(tenantId) + " AND bucket >= " + lit(from) + " AND bucket < " + lit(to); if (applicationId != null) { @@ -327,7 +327,7 @@ public class ClickHouseStatsStore implements StatsStore { "quantileMerge(0.99)(p99_duration) AS p99_duration, " + runningCol + " AS active_count " + "FROM " + view + - " WHERE tenant_id = " + lit(TENANT) + + " WHERE tenant_id = " + lit(tenantId) + " AND bucket >= " + lit(rangeFrom) + " AND bucket < " + lit(rangeTo); for (Filter f : filters) { @@ -413,7 +413,7 @@ public class ClickHouseStatsStore implements StatsStore { "quantileMerge(0.99)(p99_duration) AS p99_duration, " + runningCol + " AS active_count " + "FROM " + view + - " WHERE tenant_id = " + lit(TENANT) + + " WHERE tenant_id = " + lit(tenantId) + " AND bucket >= " + lit(from) + " AND bucket < " + lit(to); for (Filter f : filters) { @@ -453,7 +453,7 @@ public class ClickHouseStatsStore implements StatsStore { "quantileMerge(0.99)(p99_duration) AS p99_duration, " + "countIfMerge(running_count) AS active_count " + "FROM " + view + - " WHERE tenant_id = " + lit(TENANT) + + " WHERE tenant_id = " + lit(tenantId) + " AND bucket >= " + lit(from) + " AND bucket < " + lit(to); for (Filter f : filters) { @@ -499,7 +499,7 @@ public class ClickHouseStatsStore implements StatsStore { rs.getLong("total_count"), rs.getLong("failed_count"), (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"), rs.getLong("active_count") - }, TENANT, ts(from), ts(to), routeId, processorType); + }, tenantId, ts(from), ts(to), routeId, processorType); if (!currentResult.isEmpty()) { long[] r = currentResult.get(0); totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; p99Duration = r[3]; @@ -511,7 +511,7 @@ public class ClickHouseStatsStore implements StatsStore { var prevResult = jdbc.query(sql, (rs, rowNum) -> new long[]{ rs.getLong("total_count"), rs.getLong("failed_count"), (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration") - }, TENANT, ts(prevFrom), ts(prevTo), routeId, processorType); + }, tenantId, ts(prevFrom), ts(prevTo), routeId, processorType); if (!prevResult.isEmpty()) { long[] r = prevResult.get(0); prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3]; @@ -520,7 +520,7 @@ public class ClickHouseStatsStore implements StatsStore { Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS); long totalToday = 0; var todayResult = jdbc.query(sql, (rs, rowNum) -> rs.getLong("total_count"), - TENANT, ts(todayStart), ts(Instant.now()), routeId, processorType); + tenantId, ts(todayStart), ts(Instant.now()), routeId, processorType); if (!todayResult.isEmpty()) totalToday = todayResult.get(0); return new ExecutionStats( @@ -555,7 +555,7 @@ public class ClickHouseStatsStore implements StatsStore { rs.getLong("total_count"), rs.getLong("failed_count"), (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"), rs.getLong("active_count") - ), TENANT, ts(from), ts(to), routeId, processorType); + ), tenantId, ts(from), ts(to), routeId, processorType); return new StatsTimeseries(buckets); } diff --git a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseUsageTracker.java b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseUsageTracker.java index 4de9165d..efabdbc5 100644 --- a/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseUsageTracker.java +++ b/cameleer3-server-app/src/main/java/com/cameleer3/server/app/storage/ClickHouseUsageTracker.java @@ -17,10 +17,12 @@ public class ClickHouseUsageTracker implements UsageTracker { private static final Logger log = LoggerFactory.getLogger(ClickHouseUsageTracker.class); + private final String tenantId; private final JdbcTemplate jdbc; private final WriteBuffer buffer; - public ClickHouseUsageTracker(JdbcTemplate jdbc, WriteBuffer buffer) { + public ClickHouseUsageTracker(String tenantId, JdbcTemplate jdbc, WriteBuffer buffer) { + this.tenantId = tenantId; this.jdbc = jdbc; this.buffer = buffer; } @@ -35,11 +37,12 @@ public class ClickHouseUsageTracker implements UsageTracker { if (batch.isEmpty()) return; jdbc.batchUpdate(""" - INSERT INTO usage_events (timestamp, username, method, path, normalized, + INSERT INTO usage_events (tenant_id, timestamp, username, method, path, normalized, status_code, duration_ms, query_params) - VALUES (?, ?, ?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, batch.stream().map(e -> new Object[]{ + tenantId, Timestamp.from(e.timestamp()), e.username(), e.method(), @@ -59,9 +62,10 @@ public class ClickHouseUsageTracker implements UsageTracker { count() AS cnt, avg(duration_ms) AS avg_dur FROM usage_events - WHERE timestamp >= ? AND timestamp < ? + WHERE tenant_id = ? AND timestamp >= ? AND timestamp < ? """); List params = new ArrayList<>(); + params.add(tenantId); params.add(Timestamp.from(from)); params.add(Timestamp.from(to)); @@ -80,13 +84,13 @@ public class ClickHouseUsageTracker implements UsageTracker { String sql = """ SELECT username AS key, count() AS cnt, avg(duration_ms) AS avg_dur FROM usage_events - WHERE timestamp >= ? AND timestamp < ? + WHERE tenant_id = ? AND timestamp >= ? AND timestamp < ? GROUP BY key ORDER BY cnt DESC LIMIT 100 """; return jdbc.query(sql, (rs, i) -> new UsageStats( rs.getString("key"), rs.getLong("cnt"), rs.getLong("avg_dur")), - Timestamp.from(from), Timestamp.from(to)); + tenantId, Timestamp.from(from), Timestamp.from(to)); } public List queryByHour(Instant from, Instant to, String username) { @@ -95,9 +99,10 @@ public class ClickHouseUsageTracker implements UsageTracker { count() AS cnt, avg(duration_ms) AS avg_dur FROM usage_events - WHERE timestamp >= ? AND timestamp < ? + WHERE tenant_id = ? AND timestamp >= ? AND timestamp < ? """); List params = new ArrayList<>(); + params.add(tenantId); params.add(Timestamp.from(from)); params.add(Timestamp.from(to)); diff --git a/cameleer3-server-app/src/main/resources/application.yml b/cameleer3-server-app/src/main/resources/application.yml index 82e85261..119236ad 100644 --- a/cameleer3-server-app/src/main/resources/application.yml +++ b/cameleer3-server-app/src/main/resources/application.yml @@ -38,6 +38,8 @@ ingestion: flush-interval-ms: 5000 cameleer: + tenant: + id: ${CAMELEER_TENANT_ID:default} body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384} indexer: debounce-ms: ${CAMELEER_INDEXER_DEBOUNCE_MS:2000} diff --git a/cameleer3-server-app/src/main/resources/clickhouse/init.sql b/cameleer3-server-app/src/main/resources/clickhouse/init.sql index 5b92c4df..f6e848c9 100644 --- a/cameleer3-server-app/src/main/resources/clickhouse/init.sql +++ b/cameleer3-server-app/src/main/resources/clickhouse/init.sql @@ -6,6 +6,7 @@ CREATE TABLE IF NOT EXISTS agent_metrics ( tenant_id LowCardinality(String) DEFAULT 'default', collected_at DateTime64(3), + environment LowCardinality(String) DEFAULT 'default', instance_id LowCardinality(String), metric_name LowCardinality(String), metric_value Float64, @@ -14,7 +15,7 @@ CREATE TABLE IF NOT EXISTS agent_metrics ( ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(collected_at)) -ORDER BY (tenant_id, instance_id, metric_name, collected_at) +ORDER BY (tenant_id, collected_at, environment, instance_id, metric_name) TTL toDateTime(collected_at) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; @@ -28,6 +29,7 @@ CREATE TABLE IF NOT EXISTS executions ( route_id LowCardinality(String), instance_id LowCardinality(String), application_id LowCardinality(String), + environment LowCardinality(String) DEFAULT 'default', status LowCardinality(String), correlation_id String DEFAULT '', exchange_id String DEFAULT '', @@ -68,7 +70,7 @@ CREATE TABLE IF NOT EXISTS executions ( ) ENGINE = ReplacingMergeTree(_version) PARTITION BY (tenant_id, toYYYYMM(start_time)) -ORDER BY (tenant_id, start_time, application_id, route_id, execution_id) +ORDER BY (tenant_id, start_time, environment, application_id, route_id, execution_id) TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; @@ -85,6 +87,7 @@ CREATE TABLE IF NOT EXISTS processor_executions ( start_time DateTime64(3), route_id LowCardinality(String), application_id LowCardinality(String), + environment LowCardinality(String) DEFAULT 'default', iteration Nullable(Int32), iteration_size Nullable(Int32), status LowCardinality(String), @@ -116,7 +119,7 @@ CREATE TABLE IF NOT EXISTS processor_executions ( ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(start_time)) -ORDER BY (tenant_id, start_time, application_id, route_id, execution_id, seq) +ORDER BY (tenant_id, start_time, environment, application_id, route_id, execution_id, seq) TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; @@ -127,6 +130,7 @@ SETTINGS index_granularity = 8192; CREATE TABLE IF NOT EXISTS stats_1m_all ( tenant_id LowCardinality(String), bucket DateTime, + environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(count), failed_count AggregateFunction(countIf, UInt8), running_count AggregateFunction(countIf, UInt8), @@ -136,13 +140,14 @@ CREATE TABLE IF NOT EXISTS stats_1m_all ( ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) -ORDER BY (tenant_id, bucket) +ORDER BY (tenant_id, bucket, environment) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS SELECT tenant_id, toStartOfMinute(start_time) AS bucket, + environment, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, @@ -150,7 +155,7 @@ SELECT maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions -GROUP BY tenant_id, bucket; +GROUP BY tenant_id, bucket, environment; -- stats_1m_app (per-application) @@ -158,6 +163,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_app ( tenant_id LowCardinality(String), application_id LowCardinality(String), bucket DateTime, + environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(count), failed_count AggregateFunction(countIf, UInt8), running_count AggregateFunction(countIf, UInt8), @@ -167,7 +173,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_app ( ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) -ORDER BY (tenant_id, application_id, bucket) +ORDER BY (tenant_id, bucket, environment, application_id) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS @@ -175,6 +181,7 @@ SELECT tenant_id, application_id, toStartOfMinute(start_time) AS bucket, + environment, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, @@ -182,7 +189,7 @@ SELECT maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions -GROUP BY tenant_id, application_id, bucket; +GROUP BY tenant_id, application_id, bucket, environment; -- stats_1m_route (per-route) @@ -191,6 +198,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_route ( application_id LowCardinality(String), route_id LowCardinality(String), bucket DateTime, + environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(count), failed_count AggregateFunction(countIf, UInt8), running_count AggregateFunction(countIf, UInt8), @@ -200,7 +208,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_route ( ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) -ORDER BY (tenant_id, application_id, route_id, bucket) +ORDER BY (tenant_id, bucket, environment, application_id, route_id) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS @@ -209,6 +217,7 @@ SELECT application_id, route_id, toStartOfMinute(start_time) AS bucket, + environment, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'RUNNING') AS running_count, @@ -216,7 +225,7 @@ SELECT maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM executions -GROUP BY tenant_id, application_id, route_id, bucket; +GROUP BY tenant_id, application_id, route_id, bucket, environment; -- stats_1m_processor (per-processor-type) @@ -225,6 +234,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor ( application_id LowCardinality(String), processor_type LowCardinality(String), bucket DateTime, + environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(count), failed_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), @@ -233,7 +243,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor ( ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) -ORDER BY (tenant_id, application_id, processor_type, bucket) +ORDER BY (tenant_id, bucket, environment, application_id, processor_type) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS @@ -242,13 +252,14 @@ SELECT application_id, processor_type, toStartOfMinute(start_time) AS bucket, + environment, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions -GROUP BY tenant_id, application_id, processor_type, bucket; +GROUP BY tenant_id, application_id, processor_type, bucket, environment; -- stats_1m_processor_detail (per-processor-id) @@ -259,6 +270,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( processor_id String, processor_type LowCardinality(String), bucket DateTime, + environment LowCardinality(String) DEFAULT 'default', total_count AggregateFunction(count), failed_count AggregateFunction(countIf, UInt8), duration_sum AggregateFunction(sum, Nullable(Int64)), @@ -267,7 +279,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor_detail ( ) ENGINE = AggregatingMergeTree() PARTITION BY (tenant_id, toYYYYMM(bucket)) -ORDER BY (tenant_id, application_id, route_id, processor_id, processor_type, bucket) +ORDER BY (tenant_id, bucket, environment, application_id, route_id, processor_id, processor_type) TTL bucket + INTERVAL 365 DAY DELETE; CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS @@ -278,13 +290,14 @@ SELECT processor_id, processor_type, toStartOfMinute(start_time) AS bucket, + environment, countState() AS total_count, countIfState(status = 'FAILED') AS failed_count, sumState(duration_ms) AS duration_sum, maxState(duration_ms) AS duration_max, quantileState(0.99)(duration_ms) AS p99_duration FROM processor_executions -GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket; +GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket, environment; -- ── Route Diagrams ────────────────────────────────────────────────────── @@ -294,11 +307,12 @@ CREATE TABLE IF NOT EXISTS route_diagrams ( route_id LowCardinality(String), instance_id LowCardinality(String), application_id LowCardinality(String), + environment LowCardinality(String) DEFAULT 'default', definition String, created_at DateTime64(3) DEFAULT now64(3) ) ENGINE = ReplacingMergeTree(created_at) -ORDER BY (tenant_id, content_hash) +ORDER BY (tenant_id, environment, route_id, instance_id, content_hash) SETTINGS index_granularity = 8192; -- ── Agent Events ──────────────────────────────────────────────────────── @@ -306,6 +320,7 @@ SETTINGS index_granularity = 8192; CREATE TABLE IF NOT EXISTS agent_events ( tenant_id LowCardinality(String) DEFAULT 'default', timestamp DateTime64(3) DEFAULT now64(3), + environment LowCardinality(String) DEFAULT 'default', instance_id LowCardinality(String), application_id LowCardinality(String), event_type LowCardinality(String), @@ -313,7 +328,7 @@ CREATE TABLE IF NOT EXISTS agent_events ( ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(timestamp)) -ORDER BY (tenant_id, application_id, instance_id, timestamp) +ORDER BY (tenant_id, timestamp, environment, instance_id) TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE; -- ── Logs ──────────────────────────────────────────────────────────────── @@ -321,6 +336,7 @@ TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE; CREATE TABLE IF NOT EXISTS logs ( tenant_id LowCardinality(String) DEFAULT 'default', timestamp DateTime64(3), + environment LowCardinality(String) DEFAULT 'default', application LowCardinality(String), instance_id LowCardinality(String), level LowCardinality(String), @@ -337,14 +353,16 @@ CREATE TABLE IF NOT EXISTS logs ( ) ENGINE = MergeTree() PARTITION BY (tenant_id, toYYYYMM(timestamp)) -ORDER BY (tenant_id, application, timestamp) +ORDER BY (tenant_id, timestamp, environment, application, instance_id) TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE SETTINGS index_granularity = 8192; -- ── Usage Events ──────────────────────────────────────────────────────── CREATE TABLE IF NOT EXISTS usage_events ( + tenant_id LowCardinality(String) DEFAULT 'default', timestamp DateTime64(3) DEFAULT now64(3), + environment LowCardinality(String) DEFAULT 'default', username LowCardinality(String), method LowCardinality(String), path String, @@ -354,5 +372,5 @@ CREATE TABLE IF NOT EXISTS usage_events ( query_params String DEFAULT '' ) ENGINE = MergeTree() -ORDER BY (username, timestamp) +ORDER BY (tenant_id, timestamp, environment, username, normalized) TTL toDateTime(timestamp) + INTERVAL 90 DAY; diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/TestSecurityHelper.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/TestSecurityHelper.java index d2fb1c54..58ca0a0c 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/TestSecurityHelper.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/TestSecurityHelper.java @@ -30,7 +30,7 @@ public class TestSecurityHelper { * Registers a test agent and returns a valid JWT access token with AGENT role. */ public String registerTestAgent(String instanceId) { - agentRegistryService.register(instanceId, "test", "test-group", "1.0", List.of(), Map.of()); + agentRegistryService.register(instanceId, "test", "test-group", "default", "1.0", List.of(), Map.of()); return jwtService.createAccessToken(instanceId, "test-group", List.of("AGENT")); } diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseLogStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseLogStoreIT.java index 33c1435c..e5ada088 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseLogStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseLogStoreIT.java @@ -42,7 +42,7 @@ class ClickHouseLogStoreIT { ClickHouseTestHelper.executeInitSql(jdbc); jdbc.execute("TRUNCATE TABLE logs"); - store = new ClickHouseLogStore(jdbc); + store = new ClickHouseLogStore("default", jdbc); } // ── Helpers ────────────────────────────────────────────────────────── diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java index 7052799e..3fcae1f0 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/search/ClickHouseSearchIndexIT.java @@ -47,15 +47,15 @@ class ClickHouseSearchIndexIT { jdbc.execute("TRUNCATE TABLE executions"); jdbc.execute("TRUNCATE TABLE processor_executions"); - ClickHouseExecutionStore store = new ClickHouseExecutionStore(jdbc); - searchIndex = new ClickHouseSearchIndex(jdbc); + ClickHouseExecutionStore store = new ClickHouseExecutionStore("default", jdbc); + searchIndex = new ClickHouseSearchIndex("default", jdbc); // Seed test data Instant baseTime = Instant.parse("2026-03-31T10:00:00Z"); // exec-1: COMPLETED, route-timer, agent-a, my-app, corr-1, 500ms, input_body with order number, attributes MergedExecution exec1 = new MergedExecution( - "default", 1L, "exec-1", "route-timer", "agent-a", "my-app", + "default", 1L, "exec-1", "route-timer", "agent-a", "my-app", "default", "COMPLETED", "corr-1", "exchange-1", baseTime, baseTime.plusMillis(500), @@ -70,7 +70,7 @@ class ClickHouseSearchIndexIT { // exec-2: FAILED, route-timer, agent-a, my-app, corr-2, 200ms, with error MergedExecution exec2 = new MergedExecution( - "default", 1L, "exec-2", "route-timer", "agent-a", "my-app", + "default", 1L, "exec-2", "route-timer", "agent-a", "my-app", "default", "FAILED", "corr-2", "exchange-2", baseTime.plusSeconds(1), baseTime.plusSeconds(1).plusMillis(200), @@ -87,7 +87,7 @@ class ClickHouseSearchIndexIT { // exec-3: COMPLETED, route-rest, agent-b, other-app, 100ms, no error MergedExecution exec3 = new MergedExecution( - "default", 1L, "exec-3", "route-rest", "agent-b", "other-app", + "default", 1L, "exec-3", "route-rest", "agent-b", "other-app", "default", "COMPLETED", "", "exchange-3", baseTime.plusSeconds(2), baseTime.plusSeconds(2).plusMillis(100), diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepositoryIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepositoryIT.java index 30a9ae2f..74a8ff6c 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepositoryIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseAgentEventRepositoryIT.java @@ -39,7 +39,7 @@ class ClickHouseAgentEventRepositoryIT { ClickHouseTestHelper.executeInitSql(jdbc); jdbc.execute("TRUNCATE TABLE agent_events"); - repo = new ClickHouseAgentEventRepository(jdbc); + repo = new ClickHouseAgentEventRepository("default", jdbc); } // ── Helpers ────────────────────────────────────────────────────────────── diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java index 70bed5bf..4bb63f97 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseChunkPipelineIT.java @@ -59,13 +59,14 @@ class ClickHouseChunkPipelineIT { jdbc.execute("TRUNCATE TABLE executions"); jdbc.execute("TRUNCATE TABLE processor_executions"); - executionStore = new ClickHouseExecutionStore(jdbc); - searchIndex = new ClickHouseSearchIndex(jdbc); + executionStore = new ClickHouseExecutionStore("default", jdbc); + searchIndex = new ClickHouseSearchIndex("default", jdbc); executionBuffer = new ArrayList<>(); processorBuffer = new ArrayList<>(); DiagramStore noOpDiagramStore = org.mockito.Mockito.mock(DiagramStore.class); - accumulator = new ChunkAccumulator(executionBuffer::add, processorBuffer::add, noOpDiagramStore, Duration.ofMinutes(5)); + accumulator = new ChunkAccumulator("default", executionBuffer::add, processorBuffer::add, + noOpDiagramStore, Duration.ofMinutes(5), id -> "default"); } @Test diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseDiagramStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseDiagramStoreIT.java index 5a1f876f..56f99883 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseDiagramStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseDiagramStoreIT.java @@ -41,7 +41,7 @@ class ClickHouseDiagramStoreIT { ClickHouseTestHelper.executeInitSql(jdbc); jdbc.execute("TRUNCATE TABLE route_diagrams"); - store = new ClickHouseDiagramStore(jdbc); + store = new ClickHouseDiagramStore("default", jdbc); } // ── Helpers ────────────────────────────────────────────────────────── diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java index ed3ef3cd..6174a47d 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionReadIT.java @@ -47,7 +47,7 @@ class ClickHouseExecutionReadIT { jdbc.execute("TRUNCATE TABLE executions"); jdbc.execute("TRUNCATE TABLE processor_executions"); - store = new ClickHouseExecutionStore(jdbc); + store = new ClickHouseExecutionStore("default", jdbc); detailService = new DetailService(store); } @@ -55,7 +55,7 @@ class ClickHouseExecutionReadIT { private MergedExecution minimalExecution(String executionId) { return new MergedExecution( - "default", 1L, executionId, "route-a", "agent-1", "my-app", + "default", 1L, executionId, "route-a", "agent-1", "my-app", "default", "COMPLETED", "corr-1", "exchange-1", Instant.parse("2026-04-01T10:00:00Z"), Instant.parse("2026-04-01T10:00:01Z"), diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java index c74155c8..6ebf7481 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseExecutionStoreIT.java @@ -43,13 +43,13 @@ class ClickHouseExecutionStoreIT { jdbc.execute("TRUNCATE TABLE executions"); jdbc.execute("TRUNCATE TABLE processor_executions"); - store = new ClickHouseExecutionStore(jdbc); + store = new ClickHouseExecutionStore("default", jdbc); } @Test void insertExecutionBatch_writesToClickHouse() { MergedExecution exec = new MergedExecution( - "default", 1L, "exec-1", "route-a", "agent-1", "my-app", + "default", 1L, "exec-1", "route-a", "agent-1", "my-app", "default", "COMPLETED", "corr-1", "exchange-1", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:01Z"), @@ -181,7 +181,7 @@ class ClickHouseExecutionStoreIT { @Test void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() { MergedExecution v1 = new MergedExecution( - "default", 1L, "exec-r", "route-a", "agent-1", "my-app", + "default", 1L, "exec-r", "route-a", "agent-1", "my-app", "default", "RUNNING", "corr-1", "exchange-1", Instant.parse("2026-03-31T10:00:00Z"), null, null, @@ -194,7 +194,7 @@ class ClickHouseExecutionStoreIT { ); MergedExecution v2 = new MergedExecution( - "default", 2L, "exec-r", "route-a", "agent-1", "my-app", + "default", 2L, "exec-r", "route-a", "agent-1", "my-app", "default", "COMPLETED", "corr-1", "exchange-1", Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:05Z"), diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreIT.java index a2087e38..3103b5ed 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsQueryStoreIT.java @@ -60,7 +60,7 @@ class ClickHouseMetricsQueryStoreIT { "agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts)); } - queryStore = new ClickHouseMetricsQueryStore(jdbc); + queryStore = new ClickHouseMetricsQueryStore("default", jdbc); } @Test diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreIT.java index 5f45c432..af555db1 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseMetricsStoreIT.java @@ -51,7 +51,7 @@ class ClickHouseMetricsStoreIT { jdbc.execute("TRUNCATE TABLE agent_metrics"); - store = new ClickHouseMetricsStore(jdbc); + store = new ClickHouseMetricsStore("default", jdbc); } @Test diff --git a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java index f184ffeb..443cb9de 100644 --- a/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java +++ b/cameleer3-server-app/src/test/java/com/cameleer3/server/app/storage/ClickHouseStatsStoreIT.java @@ -75,7 +75,7 @@ class ClickHouseStatsStoreIT { System.out.println("LOG: " + entry.get("type") + " | " + entry.get("q")); } - store = new ClickHouseStatsStore(jdbc); + store = new ClickHouseStatsStore("default", jdbc); } private void seedTestData() { diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentInfo.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentInfo.java index e5bcceaf..5c0546ca 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentInfo.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentInfo.java @@ -14,6 +14,7 @@ import java.util.Map; * @param instanceId agent-provided persistent identifier * @param displayName human-readable agent name * @param applicationId application identifier (e.g., "order-service-prod") + * @param environmentId logical environment (e.g., "dev", "staging", "prod") * @param version agent software version * @param routeIds list of Camel route IDs managed by this agent * @param capabilities agent-declared capabilities (free-form) @@ -26,6 +27,7 @@ public record AgentInfo( String instanceId, String displayName, String applicationId, + String environmentId, String version, List routeIds, Map capabilities, @@ -36,33 +38,33 @@ public record AgentInfo( ) { public AgentInfo withState(AgentState newState) { - return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities, + return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, capabilities, newState, registeredAt, lastHeartbeat, staleTransitionTime); } public AgentInfo withLastHeartbeat(Instant newLastHeartbeat) { - return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities, + return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, capabilities, state, registeredAt, newLastHeartbeat, staleTransitionTime); } public AgentInfo withRegisteredAt(Instant newRegisteredAt) { - return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities, + return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, capabilities, state, newRegisteredAt, lastHeartbeat, staleTransitionTime); } public AgentInfo withStaleTransitionTime(Instant newStaleTransitionTime) { - return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities, + return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, capabilities, state, registeredAt, lastHeartbeat, newStaleTransitionTime); } public AgentInfo withCapabilities(Map newCapabilities) { - return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, newCapabilities, + return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, newCapabilities, state, registeredAt, lastHeartbeat, staleTransitionTime); } - public AgentInfo withMetadata(String displayName, String applicationId, String version, - List routeIds, Map capabilities) { - return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities, + public AgentInfo withMetadata(String displayName, String applicationId, String environmentId, + String version, List routeIds, Map capabilities) { + return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, capabilities, state, registeredAt, lastHeartbeat, staleTransitionTime); } } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java index f34618fc..e079f3c4 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/agent/AgentRegistryService.java @@ -46,10 +46,10 @@ public class AgentRegistryService { * Register a new agent or re-register an existing one. * Re-registration updates metadata, transitions state to LIVE, and resets timestamps. */ - public AgentInfo register(String id, String name, String application, String version, - List routeIds, Map capabilities) { + public AgentInfo register(String id, String name, String application, String environmentId, + String version, List routeIds, Map capabilities) { Instant now = Instant.now(); - AgentInfo newAgent = new AgentInfo(id, name, application, version, + AgentInfo newAgent = new AgentInfo(id, name, application, environmentId, version, List.copyOf(routeIds), Map.copyOf(capabilities), AgentState.LIVE, now, now, null); @@ -58,13 +58,13 @@ public class AgentRegistryService { // Re-registration: update metadata, reset to LIVE log.info("Agent {} re-registering (was {})", id, existing.state()); return existing - .withMetadata(name, application, version, List.copyOf(routeIds), Map.copyOf(capabilities)) + .withMetadata(name, application, environmentId, version, List.copyOf(routeIds), Map.copyOf(capabilities)) .withState(AgentState.LIVE) .withLastHeartbeat(now) .withRegisteredAt(now) .withStaleTransitionTime(null); } - log.info("Agent {} registered (name={}, application={})", id, name, application); + log.info("Agent {} registered (name={}, application={}, env={})", id, name, application, environmentId); return newAgent; }); } diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/BufferedLogEntry.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/BufferedLogEntry.java index 721ff86f..83575fae 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/BufferedLogEntry.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/BufferedLogEntry.java @@ -6,6 +6,8 @@ import com.cameleer3.common.model.LogEntry; * A log entry paired with its agent metadata, ready for buffered ClickHouse insertion. */ public record BufferedLogEntry( + String tenantId, + String environment, String instanceId, String applicationId, LogEntry entry diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java index bc226f58..f9986f5e 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/ChunkAccumulator.java @@ -15,6 +15,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; +import java.util.function.Function; /** * Accumulates {@link ExecutionChunk} documents and produces: @@ -26,23 +27,28 @@ import java.util.function.Consumer; public class ChunkAccumulator { private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class); - private static final String DEFAULT_TENANT = "default"; private static final ObjectMapper MAPPER = new ObjectMapper(); + private final String tenantId; private final Consumer executionSink; private final Consumer processorSink; private final DiagramStore diagramStore; private final Duration staleThreshold; + private final Function environmentResolver; private final ConcurrentHashMap pending = new ConcurrentHashMap<>(); - public ChunkAccumulator(Consumer executionSink, + public ChunkAccumulator(String tenantId, + Consumer executionSink, Consumer processorSink, DiagramStore diagramStore, - Duration staleThreshold) { + Duration staleThreshold, + Function environmentResolver) { + this.tenantId = tenantId; this.executionSink = executionSink; this.processorSink = processorSink; this.diagramStore = diagramStore; this.staleThreshold = staleThreshold; + this.environmentResolver = environmentResolver; } /** @@ -51,13 +57,16 @@ public class ChunkAccumulator { */ public void onChunk(ExecutionChunk chunk) { // 1. Push processor records immediately (append-only) + String environment = environmentResolver.apply( + chunk.getInstanceId() != null ? chunk.getInstanceId() : ""); boolean chunkHasTrace = false; if (chunk.getProcessors() != null && !chunk.getProcessors().isEmpty()) { processorSink.accept(new ProcessorBatch( - DEFAULT_TENANT, + this.tenantId, chunk.getExchangeId(), chunk.getRouteId(), chunk.getApplicationId(), + environment, chunk.getStartTime(), chunk.getProcessors())); chunkHasTrace = chunk.getProcessors().stream() @@ -164,13 +173,16 @@ public class ChunkAccumulator { } catch (Exception e) { log.debug("Could not resolve diagram hash for route={}", envelope.getRouteId()); } + String env = environmentResolver.apply( + envelope.getInstanceId() != null ? envelope.getInstanceId() : ""); return new MergedExecution( - DEFAULT_TENANT, + this.tenantId, 1L, envelope.getExchangeId(), envelope.getRouteId(), envelope.getInstanceId(), envelope.getApplicationId(), + env, envelope.getStatus() != null ? envelope.getStatus().name() : "RUNNING", envelope.getCorrelationId(), envelope.getExchangeId(), @@ -236,6 +248,7 @@ public class ChunkAccumulator { String executionId, String routeId, String applicationId, + String environment, Instant execStartTime, List processors ) {} diff --git a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java index 95f7f255..d8e55dee 100644 --- a/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java +++ b/cameleer3-server-core/src/main/java/com/cameleer3/server/core/ingestion/MergedExecution.java @@ -13,6 +13,7 @@ public record MergedExecution( String routeId, String instanceId, String applicationId, + String environment, String status, String correlationId, String exchangeId, diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/agent/AgentRegistryServiceTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/agent/AgentRegistryServiceTest.java index 874cb4ca..1b395f03 100644 --- a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/agent/AgentRegistryServiceTest.java +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/agent/AgentRegistryServiceTest.java @@ -26,7 +26,7 @@ class AgentRegistryServiceTest { @Test void registerNewAgent_createsWithLiveState() { - AgentInfo agent = registry.register("agent-1", "Order Agent", "order-svc", + AgentInfo agent = registry.register("agent-1", "Order Agent", "order-svc", "default", "1.0.0", List.of("route1", "route2"), Map.of("feature", "tracing")); assertThat(agent).isNotNull(); @@ -44,10 +44,10 @@ class AgentRegistryServiceTest { @Test void reRegisterSameId_updatesMetadataAndTransitionsToLive() { - registry.register("agent-1", "Old Name", "old-group", + registry.register("agent-1", "Old Name", "old-group", "default", "1.0.0", List.of("route1"), Map.of()); - AgentInfo updated = registry.register("agent-1", "New Name", "new-group", + AgentInfo updated = registry.register("agent-1", "New Name", "new-group", "default", "2.0.0", List.of("route1", "route2"), Map.of("new", "cap")); assertThat(updated.instanceId()).isEqualTo("agent-1"); @@ -62,11 +62,11 @@ class AgentRegistryServiceTest { @Test void reRegisterSameId_updatesRegisteredAtAndLastHeartbeat() { - AgentInfo first = registry.register("agent-1", "Name", "group", + AgentInfo first = registry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of()); Instant firstRegisteredAt = first.registeredAt(); - AgentInfo second = registry.register("agent-1", "Name", "group", + AgentInfo second = registry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of()); assertThat(second.registeredAt()).isAfterOrEqualTo(firstRegisteredAt); @@ -79,7 +79,7 @@ class AgentRegistryServiceTest { @Test void heartbeatKnownAgent_returnsTrue() { - registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + registry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of()); boolean result = registry.heartbeat("agent-1"); @@ -88,7 +88,7 @@ class AgentRegistryServiceTest { @Test void heartbeatKnownAgent_updatesLastHeartbeat() { - registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + registry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of()); Instant before = registry.findById("agent-1").lastHeartbeat(); registry.heartbeat("agent-1"); @@ -106,7 +106,7 @@ class AgentRegistryServiceTest { @Test void heartbeatStaleAgent_transitionsToLive() { - registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + registry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of()); registry.transitionState("agent-1", AgentState.STALE); assertThat(registry.findById("agent-1").state()).isEqualTo(AgentState.STALE); @@ -125,7 +125,7 @@ class AgentRegistryServiceTest { void liveAgentBeyondStaleThreshold_transitionsToStale() { // Use very short thresholds for test AgentRegistryService shortRegistry = new AgentRegistryService(1, 300_000, 60_000); - shortRegistry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + shortRegistry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of()); // Wait briefly to exceed 1ms threshold try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } @@ -141,7 +141,7 @@ class AgentRegistryServiceTest { void staleAgentBeyondDeadThreshold_transitionsToDead() { // Use very short thresholds for test: 1ms stale, 1ms dead AgentRegistryService shortRegistry = new AgentRegistryService(1, 1, 60_000); - shortRegistry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + shortRegistry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of()); try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } shortRegistry.checkLifecycle(); // LIVE -> STALE @@ -155,7 +155,7 @@ class AgentRegistryServiceTest { @Test void deadAgentRemainsDead() { AgentRegistryService shortRegistry = new AgentRegistryService(1, 1, 60_000); - shortRegistry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + shortRegistry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of()); try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } shortRegistry.checkLifecycle(); @@ -171,7 +171,7 @@ class AgentRegistryServiceTest { @Test void transitionState_setsStaleTransitionTimeWhenGoingStale() { - registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); + registry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of()); registry.transitionState("agent-1", AgentState.STALE); @@ -186,8 +186,8 @@ class AgentRegistryServiceTest { @Test void findAll_returnsAllAgents() { - registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); - registry.register("agent-2", "A2", "g", "1.0", List.of(), Map.of()); + registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of()); + registry.register("agent-2", "A2", "g", "default", "1.0", List.of(), Map.of()); List all = registry.findAll(); @@ -197,8 +197,8 @@ class AgentRegistryServiceTest { @Test void findByState_filtersCorrectly() { - registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); - registry.register("agent-2", "A2", "g", "1.0", List.of(), Map.of()); + registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of()); + registry.register("agent-2", "A2", "g", "default", "1.0", List.of(), Map.of()); registry.transitionState("agent-2", AgentState.STALE); List live = registry.findByState(AgentState.LIVE); @@ -217,7 +217,7 @@ class AgentRegistryServiceTest { @Test void findById_knownReturnsAgent() { - registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of()); AgentInfo result = registry.findById("agent-1"); @@ -231,7 +231,7 @@ class AgentRegistryServiceTest { @Test void addCommand_createsPendingCommand() { - registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of()); AgentCommand cmd = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{\"key\":\"val\"}"); @@ -246,7 +246,7 @@ class AgentRegistryServiceTest { @Test void addCommand_notifiesEventListener() { - registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of()); AtomicReference received = new AtomicReference<>(); registry.setEventListener((agentId, command) -> received.set(command)); @@ -259,7 +259,7 @@ class AgentRegistryServiceTest { @Test void acknowledgeCommand_transitionsStatus() { - registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of()); AgentCommand cmd = registry.addCommand("agent-1", CommandType.REPLAY, "{}"); boolean acked = registry.acknowledgeCommand("agent-1", cmd.id()); @@ -269,7 +269,7 @@ class AgentRegistryServiceTest { @Test void acknowledgeCommand_unknownReturnsFalse() { - registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of()); boolean acked = registry.acknowledgeCommand("agent-1", "nonexistent-cmd"); @@ -278,7 +278,7 @@ class AgentRegistryServiceTest { @Test void findPendingCommands_returnsOnlyPending() { - registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of()); AgentCommand cmd1 = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}"); AgentCommand cmd2 = registry.addCommand("agent-1", CommandType.DEEP_TRACE, "{}"); registry.acknowledgeCommand("agent-1", cmd1.id()); @@ -291,7 +291,7 @@ class AgentRegistryServiceTest { @Test void markDelivered_updatesStatus() { - registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of()); AgentCommand cmd = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}"); registry.markDelivered("agent-1", cmd.id()); @@ -305,7 +305,7 @@ class AgentRegistryServiceTest { void expireOldCommands_removesExpiredPendingCommands() { // Use 1ms expiry for test AgentRegistryService shortRegistry = new AgentRegistryService(90_000, 300_000, 1); - shortRegistry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); + shortRegistry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of()); shortRegistry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}"); try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java index 475c6c2e..012df44b 100644 --- a/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java +++ b/cameleer3-server-core/src/test/java/com/cameleer3/server/core/ingestion/ChunkAccumulatorTest.java @@ -35,7 +35,8 @@ class ChunkAccumulatorTest { executionSink = new CopyOnWriteArrayList<>(); processorSink = new CopyOnWriteArrayList<>(); accumulator = new ChunkAccumulator( - executionSink::add, processorSink::add, NO_OP_DIAGRAM_STORE, Duration.ofMinutes(5)); + "default", executionSink::add, processorSink::add, + NO_OP_DIAGRAM_STORE, Duration.ofMinutes(5), id -> "default"); } @Test @@ -119,7 +120,8 @@ class ChunkAccumulatorTest { @Test void staleExchange_flushedBySweep() throws Exception { ChunkAccumulator staleAccumulator = new ChunkAccumulator( - executionSink::add, processorSink::add, NO_OP_DIAGRAM_STORE, Duration.ofMillis(1)); + "default", executionSink::add, processorSink::add, + NO_OP_DIAGRAM_STORE, Duration.ofMillis(1), id -> "default"); ExecutionChunk c = chunk("ex-3", "RUNNING", Instant.parse("2026-03-31T10:00:00Z"), diff --git a/deploy/base/server.yaml b/deploy/base/server.yaml index 6f8fc23b..9cde2a3c 100644 --- a/deploy/base/server.yaml +++ b/deploy/base/server.yaml @@ -99,6 +99,8 @@ spec: value: "clickhouse" - name: CAMELEER_STORAGE_EXECUTIONS value: "clickhouse" + - name: CAMELEER_TENANT_ID + value: "default" resources: requests: