feat: implement multitenancy with tenant isolation + environment support
All checks were successful
CI / cleanup-branch (push) Has been skipped
CI / build (push) Successful in 1m8s
CI / docker (push) Successful in 42s
CI / deploy-feature (push) Has been skipped
CI / deploy (push) Successful in 1m25s

Adds configurable tenant ID (CAMELEER_TENANT_ID env var, default:
"default") and environment as a first-class concept. Each server
instance serves one tenant with multiple environments.

Changes across 36 files:
- TenantProperties config bean for tenant ID injection
- AgentInfo: added environmentId field
- AgentRegistrationRequest: added environmentId field
- All 9 ClickHouse stores: inject tenant ID, replace hardcoded
  "default" constant, add environment to writes/reads
- ChunkAccumulator: configurable tenant ID + environment resolver
- MergedExecution/ProcessorBatch/BufferedLogEntry: added environment
- ClickHouse init.sql: added environment column to all tables,
  updated ORDER BY (tenant→time→env→app), added tenant_id to
  usage_events, updated all MV GROUP BY clauses
- Controllers: pass environmentId through registration/auto-heal
- K8s deploy: added CAMELEER_TENANT_ID env var
- All tests updated for new signatures

Closes #123

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
hsiegeln
2026-04-04 15:00:18 +02:00
parent ee7226cf1c
commit a188308ec5
36 changed files with 310 additions and 188 deletions

View File

@@ -10,6 +10,8 @@ import com.cameleer3.server.app.storage.ClickHouseStatsStore;
import com.cameleer3.server.core.admin.AuditRepository; import com.cameleer3.server.core.admin.AuditRepository;
import com.cameleer3.server.core.admin.AuditService; import com.cameleer3.server.core.admin.AuditService;
import com.cameleer3.server.core.agent.AgentEventRepository; import com.cameleer3.server.core.agent.AgentEventRepository;
import com.cameleer3.server.core.agent.AgentInfo;
import com.cameleer3.server.core.agent.AgentRegistryService;
import com.cameleer3.server.core.detail.DetailService; import com.cameleer3.server.core.detail.DetailService;
import com.cameleer3.server.core.indexing.SearchIndexer; import com.cameleer3.server.core.indexing.SearchIndexer;
import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler; import com.cameleer3.server.app.ingestion.ExecutionFlushScheduler;
@@ -63,34 +65,45 @@ public class StorageBeanConfig {
@Bean @Bean
public MetricsStore clickHouseMetricsStore( public MetricsStore clickHouseMetricsStore(
TenantProperties tenantProperties,
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseMetricsStore(clickHouseJdbc); return new ClickHouseMetricsStore(tenantProperties.getId(), clickHouseJdbc);
} }
@Bean @Bean
public MetricsQueryStore clickHouseMetricsQueryStore( public MetricsQueryStore clickHouseMetricsQueryStore(
TenantProperties tenantProperties,
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseMetricsQueryStore(clickHouseJdbc); return new ClickHouseMetricsQueryStore(tenantProperties.getId(), clickHouseJdbc);
} }
// ── Execution Store ────────────────────────────────────────────────── // ── Execution Store ──────────────────────────────────────────────────
@Bean @Bean
public ClickHouseExecutionStore clickHouseExecutionStore( public ClickHouseExecutionStore clickHouseExecutionStore(
TenantProperties tenantProperties,
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseExecutionStore(clickHouseJdbc); return new ClickHouseExecutionStore(tenantProperties.getId(), clickHouseJdbc);
} }
@Bean @Bean
public ChunkAccumulator chunkAccumulator( public ChunkAccumulator chunkAccumulator(
TenantProperties tenantProperties,
WriteBuffer<MergedExecution> executionBuffer, WriteBuffer<MergedExecution> executionBuffer,
WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer, WriteBuffer<ChunkAccumulator.ProcessorBatch> processorBatchBuffer,
DiagramStore diagramStore) { DiagramStore diagramStore,
AgentRegistryService registryService) {
return new ChunkAccumulator( return new ChunkAccumulator(
tenantProperties.getId(),
executionBuffer::offerOrWarn, executionBuffer::offerOrWarn,
processorBatchBuffer::offerOrWarn, processorBatchBuffer::offerOrWarn,
diagramStore, diagramStore,
java.time.Duration.ofMinutes(5)); java.time.Duration.ofMinutes(5),
instanceId -> {
AgentInfo agent = registryService.findById(instanceId);
return agent != null && agent.environmentId() != null
? agent.environmentId() : "default";
});
} }
@Bean @Bean
@@ -108,40 +121,45 @@ public class StorageBeanConfig {
@Bean @Bean
public SearchIndex clickHouseSearchIndex( public SearchIndex clickHouseSearchIndex(
TenantProperties tenantProperties,
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseSearchIndex(clickHouseJdbc); return new ClickHouseSearchIndex(tenantProperties.getId(), clickHouseJdbc);
} }
// ── ClickHouse Stats Store ───────────────────────────────────────── // ── ClickHouse Stats Store ─────────────────────────────────────────
@Bean @Bean
public StatsStore clickHouseStatsStore( public StatsStore clickHouseStatsStore(
TenantProperties tenantProperties,
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseStatsStore(clickHouseJdbc); return new ClickHouseStatsStore(tenantProperties.getId(), clickHouseJdbc);
} }
// ── ClickHouse Diagram Store ────────────────────────────────────── // ── ClickHouse Diagram Store ──────────────────────────────────────
@Bean @Bean
public DiagramStore clickHouseDiagramStore( public DiagramStore clickHouseDiagramStore(
TenantProperties tenantProperties,
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseDiagramStore(clickHouseJdbc); return new ClickHouseDiagramStore(tenantProperties.getId(), clickHouseJdbc);
} }
// ── ClickHouse Agent Event Repository ───────────────────────────── // ── ClickHouse Agent Event Repository ─────────────────────────────
@Bean @Bean
public AgentEventRepository clickHouseAgentEventRepository( public AgentEventRepository clickHouseAgentEventRepository(
TenantProperties tenantProperties,
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseAgentEventRepository(clickHouseJdbc); return new ClickHouseAgentEventRepository(tenantProperties.getId(), clickHouseJdbc);
} }
// ── ClickHouse Log Store ────────────────────────────────────────── // ── ClickHouse Log Store ──────────────────────────────────────────
@Bean @Bean
public ClickHouseLogStore clickHouseLogStore( public ClickHouseLogStore clickHouseLogStore(
TenantProperties tenantProperties,
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseLogStore(clickHouseJdbc); return new ClickHouseLogStore(tenantProperties.getId(), clickHouseJdbc);
} }
// ── Usage Analytics ────────────────────────────────────────────── // ── Usage Analytics ──────────────────────────────────────────────
@@ -149,8 +167,9 @@ public class StorageBeanConfig {
@Bean @Bean
@ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true") @ConditionalOnProperty(name = "clickhouse.enabled", havingValue = "true")
public ClickHouseUsageTracker clickHouseUsageTracker( public ClickHouseUsageTracker clickHouseUsageTracker(
TenantProperties tenantProperties,
@Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) { @Qualifier("clickHouseJdbcTemplate") JdbcTemplate clickHouseJdbc) {
return new ClickHouseUsageTracker(clickHouseJdbc, return new ClickHouseUsageTracker(tenantProperties.getId(), clickHouseJdbc,
new com.cameleer3.server.core.ingestion.WriteBuffer<>(5000)); new com.cameleer3.server.core.ingestion.WriteBuffer<>(5000));
} }

View File

@@ -0,0 +1,19 @@
package com.cameleer3.server.app.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties(prefix = "cameleer.tenant")
public class TenantProperties {
private String id = "default";
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}

View File

@@ -115,11 +115,13 @@ public class AgentRegistrationController {
} }
String application = request.applicationId() != null ? request.applicationId() : "default"; String application = request.applicationId() != null ? request.applicationId() : "default";
String environmentId = request.environmentId() != null ? request.environmentId() : "default";
List<String> routeIds = request.routeIds() != null ? request.routeIds() : List.of(); List<String> routeIds = request.routeIds() != null ? request.routeIds() : List.of();
var capabilities = request.capabilities() != null ? request.capabilities() : Collections.<String, Object>emptyMap(); var capabilities = request.capabilities() != null ? request.capabilities() : Collections.<String, Object>emptyMap();
AgentInfo agent = registryService.register( AgentInfo agent = registryService.register(
request.instanceId(), request.displayName(), application, request.version(), routeIds, capabilities); request.instanceId(), request.displayName(), application, environmentId,
request.version(), routeIds, capabilities);
log.info("Agent registered: {} (name={}, application={})", request.instanceId(), request.displayName(), application); log.info("Agent registered: {} (name={}, application={})", request.instanceId(), request.displayName(), application);
agentEventService.recordEvent(request.instanceId(), application, "REGISTERED", agentEventService.recordEvent(request.instanceId(), application, "REGISTERED",
@@ -210,7 +212,7 @@ public class AgentRegistrationController {
if (jwtResult != null) { if (jwtResult != null) {
String application = jwtResult.application() != null ? jwtResult.application() : "default"; String application = jwtResult.application() != null ? jwtResult.application() : "default";
Map<String, Object> caps = capabilities != null ? capabilities : Map.of(); Map<String, Object> caps = capabilities != null ? capabilities : Map.of();
registryService.register(id, id, application, "unknown", registryService.register(id, id, application, "default", "unknown",
List.of(), caps); List.of(), caps);
registryService.heartbeat(id); registryService.heartbeat(id);
log.info("Auto-registered agent {} (app={}) from heartbeat after server restart", id, application); log.info("Auto-registered agent {} (app={}) from heartbeat after server restart", id, application);

View File

@@ -67,7 +67,7 @@ public class AgentSseController {
JwtAuthenticationFilter.JWT_RESULT_ATTR); JwtAuthenticationFilter.JWT_RESULT_ATTR);
if (jwtResult != null) { if (jwtResult != null) {
String application = jwtResult.application() != null ? jwtResult.application() : "default"; String application = jwtResult.application() != null ? jwtResult.application() : "default";
registryService.register(id, id, application, "unknown", List.of(), Map.of()); registryService.register(id, id, application, "default", "unknown", List.of(), Map.of());
log.info("Auto-registered agent {} (app={}) from SSE connect after server restart", id, application); log.info("Auto-registered agent {} (app={}) from SSE connect after server restart", id, application);
} else { } else {
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found: " + id); throw new ResponseStatusException(HttpStatus.NOT_FOUND, "Agent not found: " + id);

View File

@@ -8,6 +8,7 @@ import com.cameleer3.server.core.agent.AgentRegistryService;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse; import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
import com.cameleer3.server.app.config.TenantProperties;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.http.ResponseEntity; import org.springframework.http.ResponseEntity;
@@ -27,11 +28,14 @@ public class LogIngestionController {
private final WriteBuffer<BufferedLogEntry> logBuffer; private final WriteBuffer<BufferedLogEntry> logBuffer;
private final AgentRegistryService registryService; private final AgentRegistryService registryService;
private final TenantProperties tenantProperties;
public LogIngestionController(WriteBuffer<BufferedLogEntry> logBuffer, public LogIngestionController(WriteBuffer<BufferedLogEntry> logBuffer,
AgentRegistryService registryService) { AgentRegistryService registryService,
TenantProperties tenantProperties) {
this.logBuffer = logBuffer; this.logBuffer = logBuffer;
this.registryService = registryService; this.registryService = registryService;
this.tenantProperties = tenantProperties;
} }
@PostMapping("/logs") @PostMapping("/logs")
@@ -44,8 +48,10 @@ public class LogIngestionController {
if (batch.getEntries() != null && !batch.getEntries().isEmpty()) { if (batch.getEntries() != null && !batch.getEntries().isEmpty()) {
log.debug("Received {} log entries from instance={}, app={}", batch.getEntries().size(), instanceId, applicationId); log.debug("Received {} log entries from instance={}, app={}", batch.getEntries().size(), instanceId, applicationId);
String environment = resolveEnvironment(instanceId);
for (var entry : batch.getEntries()) { for (var entry : batch.getEntries()) {
logBuffer.offerOrWarn(new BufferedLogEntry(instanceId, applicationId, entry)); logBuffer.offerOrWarn(new BufferedLogEntry(
tenantProperties.getId(), environment, instanceId, applicationId, entry));
} }
} }
@@ -61,4 +67,9 @@ public class LogIngestionController {
AgentInfo agent = registryService.findById(instanceId); AgentInfo agent = registryService.findById(instanceId);
return agent != null ? agent.applicationId() : ""; return agent != null ? agent.applicationId() : "";
} }
private String resolveEnvironment(String instanceId) {
AgentInfo agent = registryService.findById(instanceId);
return agent != null && agent.environmentId() != null ? agent.environmentId() : "default";
}
} }

View File

@@ -11,6 +11,7 @@ public record AgentRegistrationRequest(
@NotNull String instanceId, @NotNull String instanceId,
@NotNull String displayName, @NotNull String displayName,
@Schema(defaultValue = "default") String applicationId, @Schema(defaultValue = "default") String applicationId,
@Schema(defaultValue = "default") String environmentId,
String version, String version,
List<String> routeIds, List<String> routeIds,
Map<String, Object> capabilities Map<String, Object> capabilities

View File

@@ -29,12 +29,13 @@ import java.util.Map;
public class ClickHouseLogStore implements LogIndex { public class ClickHouseLogStore implements LogIndex {
private static final Logger log = LoggerFactory.getLogger(ClickHouseLogStore.class); private static final Logger log = LoggerFactory.getLogger(ClickHouseLogStore.class);
private static final String TENANT = "default";
private static final DateTimeFormatter ISO_FMT = DateTimeFormatter.ISO_INSTANT; private static final DateTimeFormatter ISO_FMT = DateTimeFormatter.ISO_INSTANT;
private final String tenantId;
private final JdbcTemplate jdbc; private final JdbcTemplate jdbc;
public ClickHouseLogStore(JdbcTemplate jdbc) { public ClickHouseLogStore(String tenantId, JdbcTemplate jdbc) {
this.tenantId = tenantId;
this.jdbc = jdbc; this.jdbc = jdbc;
} }
@@ -46,23 +47,24 @@ public class ClickHouseLogStore implements LogIndex {
String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " + String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " +
"logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + "logger_name, message, thread_name, stack_trace, exchange_id, mdc) " +
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> { jdbc.batchUpdate(sql, entries, entries.size(), (ps, entry) -> {
Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now(); Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now();
ps.setTimestamp(1, Timestamp.from(ts)); ps.setString(1, tenantId);
ps.setString(2, applicationId); ps.setTimestamp(2, Timestamp.from(ts));
ps.setString(3, instanceId); ps.setString(3, applicationId);
ps.setString(4, entry.getLevel() != null ? entry.getLevel() : ""); ps.setString(4, instanceId);
ps.setString(5, entry.getLoggerName() != null ? entry.getLoggerName() : ""); ps.setString(5, entry.getLevel() != null ? entry.getLevel() : "");
ps.setString(6, entry.getMessage() != null ? entry.getMessage() : ""); ps.setString(6, entry.getLoggerName() != null ? entry.getLoggerName() : "");
ps.setString(7, entry.getThreadName() != null ? entry.getThreadName() : ""); ps.setString(7, entry.getMessage() != null ? entry.getMessage() : "");
ps.setString(8, entry.getStackTrace() != null ? entry.getStackTrace() : ""); ps.setString(8, entry.getThreadName() != null ? entry.getThreadName() : "");
ps.setString(9, entry.getStackTrace() != null ? entry.getStackTrace() : "");
Map<String, String> mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap(); Map<String, String> mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap();
String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); String exchangeId = mdc.getOrDefault("camel.exchangeId", "");
ps.setString(9, exchangeId); ps.setString(10, exchangeId);
ps.setObject(10, mdc); ps.setObject(11, mdc);
}); });
log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId); log.debug("Indexed {} log entries for instance={}, app={}", entries.size(), instanceId, applicationId);
@@ -71,26 +73,28 @@ public class ClickHouseLogStore implements LogIndex {
public void insertBufferedBatch(List<BufferedLogEntry> entries) { public void insertBufferedBatch(List<BufferedLogEntry> entries) {
if (entries.isEmpty()) return; if (entries.isEmpty()) return;
String sql = "INSERT INTO logs (tenant_id, timestamp, application, instance_id, level, " + String sql = "INSERT INTO logs (tenant_id, environment, timestamp, application, instance_id, level, " +
"logger_name, message, thread_name, stack_trace, exchange_id, mdc) " + "logger_name, message, thread_name, stack_trace, exchange_id, mdc) " +
"VALUES ('default', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
jdbc.batchUpdate(sql, entries, entries.size(), (ps, ble) -> { jdbc.batchUpdate(sql, entries, entries.size(), (ps, ble) -> {
LogEntry entry = ble.entry(); LogEntry entry = ble.entry();
Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now(); Instant ts = entry.getTimestamp() != null ? entry.getTimestamp() : Instant.now();
ps.setTimestamp(1, Timestamp.from(ts)); ps.setString(1, ble.tenantId() != null ? ble.tenantId() : tenantId);
ps.setString(2, ble.applicationId()); ps.setString(2, ble.environment() != null ? ble.environment() : "default");
ps.setString(3, ble.instanceId()); ps.setTimestamp(3, Timestamp.from(ts));
ps.setString(4, entry.getLevel() != null ? entry.getLevel() : ""); ps.setString(4, ble.applicationId());
ps.setString(5, entry.getLoggerName() != null ? entry.getLoggerName() : ""); ps.setString(5, ble.instanceId());
ps.setString(6, entry.getMessage() != null ? entry.getMessage() : ""); ps.setString(6, entry.getLevel() != null ? entry.getLevel() : "");
ps.setString(7, entry.getThreadName() != null ? entry.getThreadName() : ""); ps.setString(7, entry.getLoggerName() != null ? entry.getLoggerName() : "");
ps.setString(8, entry.getStackTrace() != null ? entry.getStackTrace() : ""); ps.setString(8, entry.getMessage() != null ? entry.getMessage() : "");
ps.setString(9, entry.getThreadName() != null ? entry.getThreadName() : "");
ps.setString(10, entry.getStackTrace() != null ? entry.getStackTrace() : "");
Map<String, String> mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap(); Map<String, String> mdc = entry.getMdc() != null ? entry.getMdc() : Collections.emptyMap();
String exchangeId = mdc.getOrDefault("camel.exchangeId", ""); String exchangeId = mdc.getOrDefault("camel.exchangeId", "");
ps.setString(9, exchangeId); ps.setString(11, exchangeId);
ps.setObject(10, mdc); ps.setObject(12, mdc);
}); });
log.debug("Flushed {} buffered log entries to ClickHouse", entries.size()); log.debug("Flushed {} buffered log entries to ClickHouse", entries.size());
@@ -101,7 +105,8 @@ public class ClickHouseLogStore implements LogIndex {
// Build shared WHERE conditions (used by both data and count queries) // Build shared WHERE conditions (used by both data and count queries)
List<String> baseConditions = new ArrayList<>(); List<String> baseConditions = new ArrayList<>();
List<Object> baseParams = new ArrayList<>(); List<Object> baseParams = new ArrayList<>();
baseConditions.add("tenant_id = 'default'"); baseConditions.add("tenant_id = ?");
baseParams.add(tenantId);
if (request.application() != null && !request.application().isEmpty()) { if (request.application() != null && !request.application().isEmpty()) {
baseConditions.add("application = ?"); baseConditions.add("application = ?");

View File

@@ -44,9 +44,11 @@ public class ClickHouseSearchIndex implements SearchIndex {
"applicationId", "application_id" "applicationId", "application_id"
); );
private final String tenantId;
private final JdbcTemplate jdbc; private final JdbcTemplate jdbc;
public ClickHouseSearchIndex(JdbcTemplate jdbc) { public ClickHouseSearchIndex(String tenantId, JdbcTemplate jdbc) {
this.tenantId = tenantId;
this.jdbc = jdbc; this.jdbc = jdbc;
} }
@@ -118,7 +120,8 @@ public class ClickHouseSearchIndex implements SearchIndex {
private String buildWhereClause(SearchRequest request, List<Object> params) { private String buildWhereClause(SearchRequest request, List<Object> params) {
List<String> conditions = new ArrayList<>(); List<String> conditions = new ArrayList<>();
conditions.add("tenant_id = 'default'"); conditions.add("tenant_id = ?");
params.add(tenantId);
if (request.timeFrom() != null) { if (request.timeFrom() != null) {
conditions.add("start_time >= ?"); conditions.add("start_time >= ?");
@@ -186,11 +189,12 @@ public class ClickHouseSearchIndex implements SearchIndex {
conditions.add("(execution_id = ? OR correlation_id = ? OR exchange_id = ?" conditions.add("(execution_id = ? OR correlation_id = ? OR exchange_id = ?"
+ " OR _search_text LIKE ? OR execution_id IN (" + " OR _search_text LIKE ? OR execution_id IN ("
+ "SELECT DISTINCT execution_id FROM processor_executions " + "SELECT DISTINCT execution_id FROM processor_executions "
+ "WHERE tenant_id = 'default' AND _search_text LIKE ?))"); + "WHERE tenant_id = ? AND _search_text LIKE ?))");
params.add(term); params.add(term);
params.add(term); params.add(term);
params.add(term); params.add(term);
params.add(likeTerm); params.add(likeTerm);
params.add(tenantId);
params.add(likeTerm); params.add(likeTerm);
} }
@@ -199,7 +203,8 @@ public class ClickHouseSearchIndex implements SearchIndex {
String likeTerm = "%" + escapeLike(request.textInBody()) + "%"; String likeTerm = "%" + escapeLike(request.textInBody()) + "%";
conditions.add("execution_id IN (" conditions.add("execution_id IN ("
+ "SELECT DISTINCT execution_id FROM processor_executions " + "SELECT DISTINCT execution_id FROM processor_executions "
+ "WHERE tenant_id = 'default' AND (input_body LIKE ? OR output_body LIKE ?))"); + "WHERE tenant_id = ? AND (input_body LIKE ? OR output_body LIKE ?))");
params.add(tenantId);
params.add(likeTerm); params.add(likeTerm);
params.add(likeTerm); params.add(likeTerm);
} }
@@ -209,7 +214,8 @@ public class ClickHouseSearchIndex implements SearchIndex {
String likeTerm = "%" + escapeLike(request.textInHeaders()) + "%"; String likeTerm = "%" + escapeLike(request.textInHeaders()) + "%";
conditions.add("execution_id IN (" conditions.add("execution_id IN ("
+ "SELECT DISTINCT execution_id FROM processor_executions " + "SELECT DISTINCT execution_id FROM processor_executions "
+ "WHERE tenant_id = 'default' AND (input_headers LIKE ? OR output_headers LIKE ?))"); + "WHERE tenant_id = ? AND (input_headers LIKE ? OR output_headers LIKE ?))");
params.add(tenantId);
params.add(likeTerm); params.add(likeTerm);
params.add(likeTerm); params.add(likeTerm);
} }
@@ -219,9 +225,10 @@ public class ClickHouseSearchIndex implements SearchIndex {
String likeTerm = "%" + escapeLike(request.textInErrors()) + "%"; String likeTerm = "%" + escapeLike(request.textInErrors()) + "%";
conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ? OR execution_id IN (" conditions.add("(error_message LIKE ? OR error_stacktrace LIKE ? OR execution_id IN ("
+ "SELECT DISTINCT execution_id FROM processor_executions " + "SELECT DISTINCT execution_id FROM processor_executions "
+ "WHERE tenant_id = 'default' AND (error_message LIKE ? OR error_stacktrace LIKE ?)))"); + "WHERE tenant_id = ? AND (error_message LIKE ? OR error_stacktrace LIKE ?)))");
params.add(likeTerm); params.add(likeTerm);
params.add(likeTerm); params.add(likeTerm);
params.add(tenantId);
params.add(likeTerm); params.add(likeTerm);
params.add(likeTerm); params.add(likeTerm);
} }
@@ -311,9 +318,9 @@ public class ClickHouseSearchIndex implements SearchIndex {
return jdbc.queryForList(""" return jdbc.queryForList("""
SELECT DISTINCT arrayJoin(JSONExtractKeys(attributes)) AS attr_key SELECT DISTINCT arrayJoin(JSONExtractKeys(attributes)) AS attr_key
FROM executions FINAL FROM executions FINAL
WHERE tenant_id = 'default' AND attributes != '' AND attributes != '{}' WHERE tenant_id = ? AND attributes != '' AND attributes != '{}'
ORDER BY attr_key ORDER BY attr_key
""", String.class); """, String.class, tenantId);
} catch (Exception e) { } catch (Exception e) {
log.error("Failed to query distinct attribute keys", e); log.error("Failed to query distinct attribute keys", e);
return List.of(); return List.of();

View File

@@ -17,30 +17,30 @@ import java.util.List;
*/ */
public class ClickHouseAgentEventRepository implements AgentEventRepository { public class ClickHouseAgentEventRepository implements AgentEventRepository {
private static final String TENANT = "default";
private static final String INSERT_SQL = private static final String INSERT_SQL =
"INSERT INTO agent_events (tenant_id, instance_id, application_id, event_type, detail) VALUES (?, ?, ?, ?, ?)"; "INSERT INTO agent_events (tenant_id, instance_id, application_id, event_type, detail) VALUES (?, ?, ?, ?, ?)";
private static final String SELECT_BASE = private static final String SELECT_BASE =
"SELECT 0 AS id, instance_id, application_id, event_type, detail, timestamp FROM agent_events WHERE tenant_id = ?"; "SELECT 0 AS id, instance_id, application_id, event_type, detail, timestamp FROM agent_events WHERE tenant_id = ?";
private final String tenantId;
private final JdbcTemplate jdbc; private final JdbcTemplate jdbc;
public ClickHouseAgentEventRepository(JdbcTemplate jdbc) { public ClickHouseAgentEventRepository(String tenantId, JdbcTemplate jdbc) {
this.tenantId = tenantId;
this.jdbc = jdbc; this.jdbc = jdbc;
} }
@Override @Override
public void insert(String instanceId, String applicationId, String eventType, String detail) { public void insert(String instanceId, String applicationId, String eventType, String detail) {
jdbc.update(INSERT_SQL, TENANT, instanceId, applicationId, eventType, detail); jdbc.update(INSERT_SQL, tenantId, instanceId, applicationId, eventType, detail);
} }
@Override @Override
public List<AgentEventRecord> query(String applicationId, String instanceId, Instant from, Instant to, int limit) { public List<AgentEventRecord> query(String applicationId, String instanceId, Instant from, Instant to, int limit) {
var sql = new StringBuilder(SELECT_BASE); var sql = new StringBuilder(SELECT_BASE);
var params = new ArrayList<Object>(); var params = new ArrayList<Object>();
params.add(TENANT); params.add(tenantId);
if (applicationId != null) { if (applicationId != null) {
sql.append(" AND application_id = ?"); sql.append(" AND application_id = ?");

View File

@@ -39,8 +39,6 @@ public class ClickHouseDiagramStore implements DiagramStore {
private static final Logger log = LoggerFactory.getLogger(ClickHouseDiagramStore.class); private static final Logger log = LoggerFactory.getLogger(ClickHouseDiagramStore.class);
private static final String TENANT = "default";
private static final String INSERT_SQL = """ private static final String INSERT_SQL = """
INSERT INTO route_diagrams INSERT INTO route_diagrams
(tenant_id, content_hash, route_id, instance_id, application_id, definition, created_at) (tenant_id, content_hash, route_id, instance_id, application_id, definition, created_at)
@@ -64,6 +62,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
WHERE tenant_id = ? AND application_id = ? WHERE tenant_id = ? AND application_id = ?
"""; """;
private final String tenantId;
private final JdbcTemplate jdbc; private final JdbcTemplate jdbc;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
@@ -72,7 +71,8 @@ public class ClickHouseDiagramStore implements DiagramStore {
// contentHash → deserialized RouteGraph // contentHash → deserialized RouteGraph
private final ConcurrentHashMap<String, RouteGraph> graphCache = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, RouteGraph> graphCache = new ConcurrentHashMap<>();
public ClickHouseDiagramStore(JdbcTemplate jdbc) { public ClickHouseDiagramStore(String tenantId, JdbcTemplate jdbc) {
this.tenantId = tenantId;
this.jdbc = jdbc; this.jdbc = jdbc;
this.objectMapper = new ObjectMapper(); this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule()); this.objectMapper.registerModule(new JavaTimeModule());
@@ -87,7 +87,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
String key = rs.getString("route_id") + "\0" + rs.getString("instance_id"); String key = rs.getString("route_id") + "\0" + rs.getString("instance_id");
hashCache.put(key, rs.getString("content_hash")); hashCache.put(key, rs.getString("content_hash"));
}, },
TENANT); tenantId);
log.info("Diagram hash cache warmed: {} entries", hashCache.size()); log.info("Diagram hash cache warmed: {} entries", hashCache.size());
} catch (Exception e) { } catch (Exception e) {
log.warn("Failed to warm diagram hash cache — lookups will fall back to ClickHouse: {}", e.getMessage()); log.warn("Failed to warm diagram hash cache — lookups will fall back to ClickHouse: {}", e.getMessage());
@@ -109,7 +109,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
String routeId = graph.getRouteId() != null ? graph.getRouteId() : ""; String routeId = graph.getRouteId() != null ? graph.getRouteId() : "";
jdbc.update(INSERT_SQL, jdbc.update(INSERT_SQL,
TENANT, tenantId,
contentHash, contentHash,
routeId, routeId,
agentId, agentId,
@@ -134,7 +134,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
return Optional.of(cached); return Optional.of(cached);
} }
List<Map<String, Object>> rows = jdbc.queryForList(SELECT_BY_HASH, TENANT, contentHash); List<Map<String, Object>> rows = jdbc.queryForList(SELECT_BY_HASH, tenantId, contentHash);
if (rows.isEmpty()) { if (rows.isEmpty()) {
return Optional.empty(); return Optional.empty();
} }
@@ -157,7 +157,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
} }
List<Map<String, Object>> rows = jdbc.queryForList( List<Map<String, Object>> rows = jdbc.queryForList(
SELECT_HASH_FOR_ROUTE, TENANT, routeId, agentId); SELECT_HASH_FOR_ROUTE, tenantId, routeId, agentId);
if (rows.isEmpty()) { if (rows.isEmpty()) {
return Optional.empty(); return Optional.empty();
} }
@@ -186,7 +186,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
"WHERE tenant_id = ? AND route_id = ? AND instance_id IN (" + placeholders + ") " + "WHERE tenant_id = ? AND route_id = ? AND instance_id IN (" + placeholders + ") " +
"ORDER BY created_at DESC LIMIT 1"; "ORDER BY created_at DESC LIMIT 1";
var params = new ArrayList<Object>(); var params = new ArrayList<Object>();
params.add(TENANT); params.add(tenantId);
params.add(routeId); params.add(routeId);
params.addAll(agentIds); params.addAll(agentIds);
List<Map<String, Object>> rows = jdbc.queryForList(sql, params.toArray()); List<Map<String, Object>> rows = jdbc.queryForList(sql, params.toArray());
@@ -200,7 +200,7 @@ public class ClickHouseDiagramStore implements DiagramStore {
public Map<String, String> findProcessorRouteMapping(String applicationId) { public Map<String, String> findProcessorRouteMapping(String applicationId) {
Map<String, String> mapping = new HashMap<>(); Map<String, String> mapping = new HashMap<>();
List<Map<String, Object>> rows = jdbc.queryForList( List<Map<String, Object>> rows = jdbc.queryForList(
SELECT_DEFINITIONS_FOR_APP, TENANT, applicationId); SELECT_DEFINITIONS_FOR_APP, tenantId, applicationId);
for (Map<String, Object> row : rows) { for (Map<String, Object> row : rows) {
String routeId = (String) row.get("route_id"); String routeId = (String) row.get("route_id");
String json = (String) row.get("definition"); String json = (String) row.get("definition");

View File

@@ -18,14 +18,16 @@ import java.util.Optional;
public class ClickHouseExecutionStore implements ExecutionStore { public class ClickHouseExecutionStore implements ExecutionStore {
private final String tenantId;
private final JdbcTemplate jdbc; private final JdbcTemplate jdbc;
private final ObjectMapper objectMapper; private final ObjectMapper objectMapper;
public ClickHouseExecutionStore(JdbcTemplate jdbc) { public ClickHouseExecutionStore(String tenantId, JdbcTemplate jdbc) {
this(jdbc, new ObjectMapper()); this(tenantId, jdbc, new ObjectMapper());
} }
public ClickHouseExecutionStore(JdbcTemplate jdbc, ObjectMapper objectMapper) { public ClickHouseExecutionStore(String tenantId, JdbcTemplate jdbc, ObjectMapper objectMapper) {
this.tenantId = tenantId;
this.jdbc = jdbc; this.jdbc = jdbc;
this.objectMapper = objectMapper; this.objectMapper = objectMapper;
} }
@@ -36,14 +38,14 @@ public class ClickHouseExecutionStore implements ExecutionStore {
jdbc.batchUpdate(""" jdbc.batchUpdate("""
INSERT INTO executions ( INSERT INTO executions (
tenant_id, _version, execution_id, route_id, instance_id, application_id, tenant_id, _version, execution_id, route_id, instance_id, application_id,
status, correlation_id, exchange_id, start_time, end_time, duration_ms, environment, status, correlation_id, exchange_id, start_time, end_time, duration_ms,
error_message, error_stacktrace, error_type, error_category, error_message, error_stacktrace, error_type, error_category,
root_cause_type, root_cause_message, diagram_content_hash, engine_level, root_cause_type, root_cause_message, diagram_content_hash, engine_level,
input_body, output_body, input_headers, output_headers, attributes, input_body, output_body, input_headers, output_headers, attributes,
trace_id, span_id, has_trace_data, is_replay, trace_id, span_id, has_trace_data, is_replay,
original_exchange_id, replay_exchange_id original_exchange_id, replay_exchange_id
) )
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", """,
executions.stream().map(e -> new Object[]{ executions.stream().map(e -> new Object[]{
nullToEmpty(e.tenantId()), nullToEmpty(e.tenantId()),
@@ -52,6 +54,7 @@ public class ClickHouseExecutionStore implements ExecutionStore {
nullToEmpty(e.routeId()), nullToEmpty(e.routeId()),
nullToEmpty(e.instanceId()), nullToEmpty(e.instanceId()),
nullToEmpty(e.applicationId()), nullToEmpty(e.applicationId()),
nullToEmpty(e.environment()),
nullToEmpty(e.status()), nullToEmpty(e.status()),
nullToEmpty(e.correlationId()), nullToEmpty(e.correlationId()),
nullToEmpty(e.exchangeId()), nullToEmpty(e.exchangeId()),
@@ -199,11 +202,11 @@ public class ClickHouseExecutionStore implements ExecutionStore {
error_type, error_category, root_cause_type, root_cause_message, error_type, error_category, root_cause_type, root_cause_message,
trace_id, span_id, has_trace_data, is_replay trace_id, span_id, has_trace_data, is_replay
FROM executions FINAL FROM executions FINAL
WHERE tenant_id = 'default' AND execution_id = ? WHERE tenant_id = ? AND execution_id = ?
LIMIT 1 LIMIT 1
""", """,
(rs, rowNum) -> mapExecutionRecord(rs), (rs, rowNum) -> mapExecutionRecord(rs),
executionId); tenantId, executionId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
} }
@@ -219,11 +222,11 @@ public class ClickHouseExecutionStore implements ExecutionStore {
resolved_endpoint_uri, circuit_breaker_state, resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message fallback_triggered, filter_matched, duplicate_message
FROM processor_executions FROM processor_executions
WHERE tenant_id = 'default' AND execution_id = ? WHERE tenant_id = ? AND execution_id = ?
ORDER BY seq ORDER BY seq
""", """,
(rs, rowNum) -> mapProcessorRecord(rs), (rs, rowNum) -> mapProcessorRecord(rs),
executionId); tenantId, executionId);
} }
@Override @Override
@@ -238,11 +241,11 @@ public class ClickHouseExecutionStore implements ExecutionStore {
resolved_endpoint_uri, circuit_breaker_state, resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message fallback_triggered, filter_matched, duplicate_message
FROM processor_executions FROM processor_executions
WHERE tenant_id = 'default' AND execution_id = ? AND processor_id = ? WHERE tenant_id = ? AND execution_id = ? AND processor_id = ?
LIMIT 1 LIMIT 1
""", """,
(rs, rowNum) -> mapProcessorRecord(rs), (rs, rowNum) -> mapProcessorRecord(rs),
executionId, processorId); tenantId, executionId, processorId);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
} }
@@ -258,11 +261,11 @@ public class ClickHouseExecutionStore implements ExecutionStore {
resolved_endpoint_uri, circuit_breaker_state, resolved_endpoint_uri, circuit_breaker_state,
fallback_triggered, filter_matched, duplicate_message fallback_triggered, filter_matched, duplicate_message
FROM processor_executions FROM processor_executions
WHERE tenant_id = 'default' AND execution_id = ? AND seq = ? WHERE tenant_id = ? AND execution_id = ? AND seq = ?
LIMIT 1 LIMIT 1
""", """,
(rs, rowNum) -> mapProcessorRecord(rs), (rs, rowNum) -> mapProcessorRecord(rs),
executionId, seq); tenantId, executionId, seq);
return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0)); return results.isEmpty() ? Optional.empty() : Optional.of(results.get(0));
} }

View File

@@ -13,9 +13,11 @@ import java.util.Map;
public class ClickHouseMetricsQueryStore implements MetricsQueryStore { public class ClickHouseMetricsQueryStore implements MetricsQueryStore {
private final String tenantId;
private final JdbcTemplate jdbc; private final JdbcTemplate jdbc;
public ClickHouseMetricsQueryStore(JdbcTemplate jdbc) { public ClickHouseMetricsQueryStore(String tenantId, JdbcTemplate jdbc) {
this.tenantId = tenantId;
this.jdbc = jdbc; this.jdbc = jdbc;
} }
@@ -41,7 +43,8 @@ public class ClickHouseMetricsQueryStore implements MetricsQueryStore {
metric_name, metric_name,
avg(metric_value) AS avg_value avg(metric_value) AS avg_value
FROM agent_metrics FROM agent_metrics
WHERE instance_id = ? WHERE tenant_id = ?
AND instance_id = ?
AND collected_at >= ? AND collected_at >= ?
AND collected_at < ? AND collected_at < ?
AND metric_name IN (%s) AND metric_name IN (%s)
@@ -50,6 +53,7 @@ public class ClickHouseMetricsQueryStore implements MetricsQueryStore {
""".formatted(intervalSeconds, placeholders); """.formatted(intervalSeconds, placeholders);
List<Object> params = new ArrayList<>(); List<Object> params = new ArrayList<>();
params.add(tenantId);
params.add(instanceId); params.add(instanceId);
params.add(java.sql.Timestamp.from(from)); params.add(java.sql.Timestamp.from(from));
params.add(java.sql.Timestamp.from(to)); params.add(java.sql.Timestamp.from(to));

View File

@@ -11,9 +11,11 @@ import java.util.Map;
public class ClickHouseMetricsStore implements MetricsStore { public class ClickHouseMetricsStore implements MetricsStore {
private final String tenantId;
private final JdbcTemplate jdbc; private final JdbcTemplate jdbc;
public ClickHouseMetricsStore(JdbcTemplate jdbc) { public ClickHouseMetricsStore(String tenantId, JdbcTemplate jdbc) {
this.tenantId = tenantId;
this.jdbc = jdbc; this.jdbc = jdbc;
} }
@@ -22,10 +24,11 @@ public class ClickHouseMetricsStore implements MetricsStore {
if (snapshots.isEmpty()) return; if (snapshots.isEmpty()) return;
jdbc.batchUpdate(""" jdbc.batchUpdate("""
INSERT INTO agent_metrics (instance_id, metric_name, metric_value, tags, collected_at) INSERT INTO agent_metrics (tenant_id, instance_id, metric_name, metric_value, tags, collected_at)
VALUES (?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?)
""", """,
snapshots.stream().map(s -> new Object[]{ snapshots.stream().map(s -> new Object[]{
tenantId,
s.instanceId(), s.instanceId(),
s.metricName(), s.metricName(),
s.metricValue(), s.metricValue(),

View File

@@ -31,11 +31,11 @@ import java.util.Map;
*/ */
public class ClickHouseStatsStore implements StatsStore { public class ClickHouseStatsStore implements StatsStore {
private static final String TENANT = "default"; private final String tenantId;
private final JdbcTemplate jdbc; private final JdbcTemplate jdbc;
public ClickHouseStatsStore(JdbcTemplate jdbc) { public ClickHouseStatsStore(String tenantId, JdbcTemplate jdbc) {
this.tenantId = tenantId;
this.jdbc = jdbc; this.jdbc = jdbc;
} }
@@ -117,7 +117,7 @@ public class ClickHouseStatsStore implements StatsStore {
List<Object> params = new ArrayList<>(); List<Object> params = new ArrayList<>();
params.add(thresholdMs); params.add(thresholdMs);
params.add(TENANT); params.add(tenantId);
params.add(Timestamp.from(from)); params.add(Timestamp.from(from));
params.add(Timestamp.from(to)); params.add(Timestamp.from(to));
if (applicationId != null) { if (applicationId != null) {
@@ -149,7 +149,7 @@ public class ClickHouseStatsStore implements StatsStore {
jdbc.query(sql, (rs) -> { jdbc.query(sql, (rs) -> {
result.put(rs.getString("application_id"), result.put(rs.getString("application_id"),
new long[]{rs.getLong("compliant"), rs.getLong("total")}); new long[]{rs.getLong("compliant"), rs.getLong("total")});
}, defaultThresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to)); }, defaultThresholdMs, tenantId, Timestamp.from(from), Timestamp.from(to));
return result; return result;
} }
@@ -167,7 +167,7 @@ public class ClickHouseStatsStore implements StatsStore {
jdbc.query(sql, (rs) -> { jdbc.query(sql, (rs) -> {
result.put(rs.getString("route_id"), result.put(rs.getString("route_id"),
new long[]{rs.getLong("compliant"), rs.getLong("total")}); new long[]{rs.getLong("compliant"), rs.getLong("total")});
}, thresholdMs, TENANT, Timestamp.from(from), Timestamp.from(to), applicationId); }, thresholdMs, tenantId, Timestamp.from(from), Timestamp.from(to), applicationId);
return result; return result;
} }
@@ -223,13 +223,13 @@ public class ClickHouseStatsStore implements StatsStore {
"ORDER BY c.cnt DESC"; "ORDER BY c.cnt DESC";
List<Object> fullParams = new ArrayList<>(); List<Object> fullParams = new ArrayList<>();
fullParams.add(TENANT); fullParams.add(tenantId);
fullParams.addAll(params); fullParams.addAll(params);
fullParams.add(limit); fullParams.add(limit);
fullParams.add(Timestamp.from(fiveMinAgo)); fullParams.add(Timestamp.from(fiveMinAgo));
fullParams.add(Timestamp.from(tenMinAgo)); fullParams.add(Timestamp.from(tenMinAgo));
fullParams.add(Timestamp.from(fiveMinAgo)); fullParams.add(Timestamp.from(fiveMinAgo));
fullParams.add(TENANT); fullParams.add(tenantId);
fullParams.addAll(params); fullParams.addAll(params);
return jdbc.query(sql, (rs, rowNum) -> { return jdbc.query(sql, (rs, rowNum) -> {
@@ -253,7 +253,7 @@ public class ClickHouseStatsStore implements StatsStore {
"WHERE tenant_id = ? AND status = 'FAILED' AND start_time >= ? AND start_time < ?"; "WHERE tenant_id = ? AND status = 'FAILED' AND start_time >= ? AND start_time < ?";
List<Object> params = new ArrayList<>(); List<Object> params = new ArrayList<>();
params.add(TENANT); params.add(tenantId);
params.add(Timestamp.from(from)); params.add(Timestamp.from(from));
params.add(Timestamp.from(to)); params.add(Timestamp.from(to));
if (applicationId != null) { if (applicationId != null) {
@@ -275,7 +275,7 @@ public class ClickHouseStatsStore implements StatsStore {
"countMerge(total_count) AS total_count, " + "countMerge(total_count) AS total_count, " +
"countIfMerge(failed_count) AS failed_count " + "countIfMerge(failed_count) AS failed_count " +
"FROM " + view + "FROM " + view +
" WHERE tenant_id = " + lit(TENANT) + " WHERE tenant_id = " + lit(tenantId) +
" AND bucket >= " + lit(from) + " AND bucket >= " + lit(from) +
" AND bucket < " + lit(to); " AND bucket < " + lit(to);
if (applicationId != null) { if (applicationId != null) {
@@ -327,7 +327,7 @@ public class ClickHouseStatsStore implements StatsStore {
"quantileMerge(0.99)(p99_duration) AS p99_duration, " + "quantileMerge(0.99)(p99_duration) AS p99_duration, " +
runningCol + " AS active_count " + runningCol + " AS active_count " +
"FROM " + view + "FROM " + view +
" WHERE tenant_id = " + lit(TENANT) + " WHERE tenant_id = " + lit(tenantId) +
" AND bucket >= " + lit(rangeFrom) + " AND bucket >= " + lit(rangeFrom) +
" AND bucket < " + lit(rangeTo); " AND bucket < " + lit(rangeTo);
for (Filter f : filters) { for (Filter f : filters) {
@@ -413,7 +413,7 @@ public class ClickHouseStatsStore implements StatsStore {
"quantileMerge(0.99)(p99_duration) AS p99_duration, " + "quantileMerge(0.99)(p99_duration) AS p99_duration, " +
runningCol + " AS active_count " + runningCol + " AS active_count " +
"FROM " + view + "FROM " + view +
" WHERE tenant_id = " + lit(TENANT) + " WHERE tenant_id = " + lit(tenantId) +
" AND bucket >= " + lit(from) + " AND bucket >= " + lit(from) +
" AND bucket < " + lit(to); " AND bucket < " + lit(to);
for (Filter f : filters) { for (Filter f : filters) {
@@ -453,7 +453,7 @@ public class ClickHouseStatsStore implements StatsStore {
"quantileMerge(0.99)(p99_duration) AS p99_duration, " + "quantileMerge(0.99)(p99_duration) AS p99_duration, " +
"countIfMerge(running_count) AS active_count " + "countIfMerge(running_count) AS active_count " +
"FROM " + view + "FROM " + view +
" WHERE tenant_id = " + lit(TENANT) + " WHERE tenant_id = " + lit(tenantId) +
" AND bucket >= " + lit(from) + " AND bucket >= " + lit(from) +
" AND bucket < " + lit(to); " AND bucket < " + lit(to);
for (Filter f : filters) { for (Filter f : filters) {
@@ -499,7 +499,7 @@ public class ClickHouseStatsStore implements StatsStore {
rs.getLong("total_count"), rs.getLong("failed_count"), rs.getLong("total_count"), rs.getLong("failed_count"),
(long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"), (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"),
rs.getLong("active_count") rs.getLong("active_count")
}, TENANT, ts(from), ts(to), routeId, processorType); }, tenantId, ts(from), ts(to), routeId, processorType);
if (!currentResult.isEmpty()) { if (!currentResult.isEmpty()) {
long[] r = currentResult.get(0); long[] r = currentResult.get(0);
totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; p99Duration = r[3]; totalCount = r[0]; failedCount = r[1]; avgDuration = r[2]; p99Duration = r[3];
@@ -511,7 +511,7 @@ public class ClickHouseStatsStore implements StatsStore {
var prevResult = jdbc.query(sql, (rs, rowNum) -> new long[]{ var prevResult = jdbc.query(sql, (rs, rowNum) -> new long[]{
rs.getLong("total_count"), rs.getLong("failed_count"), rs.getLong("total_count"), rs.getLong("failed_count"),
(long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration") (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration")
}, TENANT, ts(prevFrom), ts(prevTo), routeId, processorType); }, tenantId, ts(prevFrom), ts(prevTo), routeId, processorType);
if (!prevResult.isEmpty()) { if (!prevResult.isEmpty()) {
long[] r = prevResult.get(0); long[] r = prevResult.get(0);
prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3]; prevTotal = r[0]; prevFailed = r[1]; prevAvg = r[2]; prevP99 = r[3];
@@ -520,7 +520,7 @@ public class ClickHouseStatsStore implements StatsStore {
Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS); Instant todayStart = Instant.now().truncatedTo(ChronoUnit.DAYS);
long totalToday = 0; long totalToday = 0;
var todayResult = jdbc.query(sql, (rs, rowNum) -> rs.getLong("total_count"), var todayResult = jdbc.query(sql, (rs, rowNum) -> rs.getLong("total_count"),
TENANT, ts(todayStart), ts(Instant.now()), routeId, processorType); tenantId, ts(todayStart), ts(Instant.now()), routeId, processorType);
if (!todayResult.isEmpty()) totalToday = todayResult.get(0); if (!todayResult.isEmpty()) totalToday = todayResult.get(0);
return new ExecutionStats( return new ExecutionStats(
@@ -555,7 +555,7 @@ public class ClickHouseStatsStore implements StatsStore {
rs.getLong("total_count"), rs.getLong("failed_count"), rs.getLong("total_count"), rs.getLong("failed_count"),
(long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"), (long) rs.getDouble("avg_duration"), (long) rs.getDouble("p99_duration"),
rs.getLong("active_count") rs.getLong("active_count")
), TENANT, ts(from), ts(to), routeId, processorType); ), tenantId, ts(from), ts(to), routeId, processorType);
return new StatsTimeseries(buckets); return new StatsTimeseries(buckets);
} }

View File

@@ -17,10 +17,12 @@ public class ClickHouseUsageTracker implements UsageTracker {
private static final Logger log = LoggerFactory.getLogger(ClickHouseUsageTracker.class); private static final Logger log = LoggerFactory.getLogger(ClickHouseUsageTracker.class);
private final String tenantId;
private final JdbcTemplate jdbc; private final JdbcTemplate jdbc;
private final WriteBuffer<UsageEvent> buffer; private final WriteBuffer<UsageEvent> buffer;
public ClickHouseUsageTracker(JdbcTemplate jdbc, WriteBuffer<UsageEvent> buffer) { public ClickHouseUsageTracker(String tenantId, JdbcTemplate jdbc, WriteBuffer<UsageEvent> buffer) {
this.tenantId = tenantId;
this.jdbc = jdbc; this.jdbc = jdbc;
this.buffer = buffer; this.buffer = buffer;
} }
@@ -35,11 +37,12 @@ public class ClickHouseUsageTracker implements UsageTracker {
if (batch.isEmpty()) return; if (batch.isEmpty()) return;
jdbc.batchUpdate(""" jdbc.batchUpdate("""
INSERT INTO usage_events (timestamp, username, method, path, normalized, INSERT INTO usage_events (tenant_id, timestamp, username, method, path, normalized,
status_code, duration_ms, query_params) status_code, duration_ms, query_params)
VALUES (?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""", """,
batch.stream().map(e -> new Object[]{ batch.stream().map(e -> new Object[]{
tenantId,
Timestamp.from(e.timestamp()), Timestamp.from(e.timestamp()),
e.username(), e.username(),
e.method(), e.method(),
@@ -59,9 +62,10 @@ public class ClickHouseUsageTracker implements UsageTracker {
count() AS cnt, count() AS cnt,
avg(duration_ms) AS avg_dur avg(duration_ms) AS avg_dur
FROM usage_events FROM usage_events
WHERE timestamp >= ? AND timestamp < ? WHERE tenant_id = ? AND timestamp >= ? AND timestamp < ?
"""); """);
List<Object> params = new ArrayList<>(); List<Object> params = new ArrayList<>();
params.add(tenantId);
params.add(Timestamp.from(from)); params.add(Timestamp.from(from));
params.add(Timestamp.from(to)); params.add(Timestamp.from(to));
@@ -80,13 +84,13 @@ public class ClickHouseUsageTracker implements UsageTracker {
String sql = """ String sql = """
SELECT username AS key, count() AS cnt, avg(duration_ms) AS avg_dur SELECT username AS key, count() AS cnt, avg(duration_ms) AS avg_dur
FROM usage_events FROM usage_events
WHERE timestamp >= ? AND timestamp < ? WHERE tenant_id = ? AND timestamp >= ? AND timestamp < ?
GROUP BY key ORDER BY cnt DESC LIMIT 100 GROUP BY key ORDER BY cnt DESC LIMIT 100
"""; """;
return jdbc.query(sql, (rs, i) -> new UsageStats( return jdbc.query(sql, (rs, i) -> new UsageStats(
rs.getString("key"), rs.getLong("cnt"), rs.getLong("avg_dur")), rs.getString("key"), rs.getLong("cnt"), rs.getLong("avg_dur")),
Timestamp.from(from), Timestamp.from(to)); tenantId, Timestamp.from(from), Timestamp.from(to));
} }
public List<UsageStats> queryByHour(Instant from, Instant to, String username) { public List<UsageStats> queryByHour(Instant from, Instant to, String username) {
@@ -95,9 +99,10 @@ public class ClickHouseUsageTracker implements UsageTracker {
count() AS cnt, count() AS cnt,
avg(duration_ms) AS avg_dur avg(duration_ms) AS avg_dur
FROM usage_events FROM usage_events
WHERE timestamp >= ? AND timestamp < ? WHERE tenant_id = ? AND timestamp >= ? AND timestamp < ?
"""); """);
List<Object> params = new ArrayList<>(); List<Object> params = new ArrayList<>();
params.add(tenantId);
params.add(Timestamp.from(from)); params.add(Timestamp.from(from));
params.add(Timestamp.from(to)); params.add(Timestamp.from(to));

View File

@@ -38,6 +38,8 @@ ingestion:
flush-interval-ms: 5000 flush-interval-ms: 5000
cameleer: cameleer:
tenant:
id: ${CAMELEER_TENANT_ID:default}
body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384} body-size-limit: ${CAMELEER_BODY_SIZE_LIMIT:16384}
indexer: indexer:
debounce-ms: ${CAMELEER_INDEXER_DEBOUNCE_MS:2000} debounce-ms: ${CAMELEER_INDEXER_DEBOUNCE_MS:2000}

View File

@@ -6,6 +6,7 @@
CREATE TABLE IF NOT EXISTS agent_metrics ( CREATE TABLE IF NOT EXISTS agent_metrics (
tenant_id LowCardinality(String) DEFAULT 'default', tenant_id LowCardinality(String) DEFAULT 'default',
collected_at DateTime64(3), collected_at DateTime64(3),
environment LowCardinality(String) DEFAULT 'default',
instance_id LowCardinality(String), instance_id LowCardinality(String),
metric_name LowCardinality(String), metric_name LowCardinality(String),
metric_value Float64, metric_value Float64,
@@ -14,7 +15,7 @@ CREATE TABLE IF NOT EXISTS agent_metrics (
) )
ENGINE = MergeTree() ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(collected_at)) PARTITION BY (tenant_id, toYYYYMM(collected_at))
ORDER BY (tenant_id, instance_id, metric_name, collected_at) ORDER BY (tenant_id, collected_at, environment, instance_id, metric_name)
TTL toDateTime(collected_at) + INTERVAL 365 DAY DELETE TTL toDateTime(collected_at) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192; SETTINGS index_granularity = 8192;
@@ -28,6 +29,7 @@ CREATE TABLE IF NOT EXISTS executions (
route_id LowCardinality(String), route_id LowCardinality(String),
instance_id LowCardinality(String), instance_id LowCardinality(String),
application_id LowCardinality(String), application_id LowCardinality(String),
environment LowCardinality(String) DEFAULT 'default',
status LowCardinality(String), status LowCardinality(String),
correlation_id String DEFAULT '', correlation_id String DEFAULT '',
exchange_id String DEFAULT '', exchange_id String DEFAULT '',
@@ -68,7 +70,7 @@ CREATE TABLE IF NOT EXISTS executions (
) )
ENGINE = ReplacingMergeTree(_version) ENGINE = ReplacingMergeTree(_version)
PARTITION BY (tenant_id, toYYYYMM(start_time)) PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, application_id, route_id, execution_id) ORDER BY (tenant_id, start_time, environment, application_id, route_id, execution_id)
TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192; SETTINGS index_granularity = 8192;
@@ -85,6 +87,7 @@ CREATE TABLE IF NOT EXISTS processor_executions (
start_time DateTime64(3), start_time DateTime64(3),
route_id LowCardinality(String), route_id LowCardinality(String),
application_id LowCardinality(String), application_id LowCardinality(String),
environment LowCardinality(String) DEFAULT 'default',
iteration Nullable(Int32), iteration Nullable(Int32),
iteration_size Nullable(Int32), iteration_size Nullable(Int32),
status LowCardinality(String), status LowCardinality(String),
@@ -116,7 +119,7 @@ CREATE TABLE IF NOT EXISTS processor_executions (
) )
ENGINE = MergeTree() ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(start_time)) PARTITION BY (tenant_id, toYYYYMM(start_time))
ORDER BY (tenant_id, start_time, application_id, route_id, execution_id, seq) ORDER BY (tenant_id, start_time, environment, application_id, route_id, execution_id, seq)
TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE TTL toDateTime(start_time) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192; SETTINGS index_granularity = 8192;
@@ -127,6 +130,7 @@ SETTINGS index_granularity = 8192;
CREATE TABLE IF NOT EXISTS stats_1m_all ( CREATE TABLE IF NOT EXISTS stats_1m_all (
tenant_id LowCardinality(String), tenant_id LowCardinality(String),
bucket DateTime, bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(count), total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8), failed_count AggregateFunction(countIf, UInt8),
running_count AggregateFunction(countIf, UInt8), running_count AggregateFunction(countIf, UInt8),
@@ -136,13 +140,14 @@ CREATE TABLE IF NOT EXISTS stats_1m_all (
) )
ENGINE = AggregatingMergeTree() ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket)) PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, bucket) ORDER BY (tenant_id, bucket, environment)
TTL bucket + INTERVAL 365 DAY DELETE; TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_all_mv TO stats_1m_all AS
SELECT SELECT
tenant_id, tenant_id,
toStartOfMinute(start_time) AS bucket, toStartOfMinute(start_time) AS bucket,
environment,
countState() AS total_count, countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count, countIfState(status = 'RUNNING') AS running_count,
@@ -150,7 +155,7 @@ SELECT
maxState(duration_ms) AS duration_max, maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration quantileState(0.99)(duration_ms) AS p99_duration
FROM executions FROM executions
GROUP BY tenant_id, bucket; GROUP BY tenant_id, bucket, environment;
-- stats_1m_app (per-application) -- stats_1m_app (per-application)
@@ -158,6 +163,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_app (
tenant_id LowCardinality(String), tenant_id LowCardinality(String),
application_id LowCardinality(String), application_id LowCardinality(String),
bucket DateTime, bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(count), total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8), failed_count AggregateFunction(countIf, UInt8),
running_count AggregateFunction(countIf, UInt8), running_count AggregateFunction(countIf, UInt8),
@@ -167,7 +173,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_app (
) )
ENGINE = AggregatingMergeTree() ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket)) PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_id, bucket) ORDER BY (tenant_id, bucket, environment, application_id)
TTL bucket + INTERVAL 365 DAY DELETE; TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_app_mv TO stats_1m_app AS
@@ -175,6 +181,7 @@ SELECT
tenant_id, tenant_id,
application_id, application_id,
toStartOfMinute(start_time) AS bucket, toStartOfMinute(start_time) AS bucket,
environment,
countState() AS total_count, countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count, countIfState(status = 'RUNNING') AS running_count,
@@ -182,7 +189,7 @@ SELECT
maxState(duration_ms) AS duration_max, maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration quantileState(0.99)(duration_ms) AS p99_duration
FROM executions FROM executions
GROUP BY tenant_id, application_id, bucket; GROUP BY tenant_id, application_id, bucket, environment;
-- stats_1m_route (per-route) -- stats_1m_route (per-route)
@@ -191,6 +198,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_route (
application_id LowCardinality(String), application_id LowCardinality(String),
route_id LowCardinality(String), route_id LowCardinality(String),
bucket DateTime, bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(count), total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8), failed_count AggregateFunction(countIf, UInt8),
running_count AggregateFunction(countIf, UInt8), running_count AggregateFunction(countIf, UInt8),
@@ -200,7 +208,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_route (
) )
ENGINE = AggregatingMergeTree() ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket)) PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_id, route_id, bucket) ORDER BY (tenant_id, bucket, environment, application_id, route_id)
TTL bucket + INTERVAL 365 DAY DELETE; TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_route_mv TO stats_1m_route AS
@@ -209,6 +217,7 @@ SELECT
application_id, application_id,
route_id, route_id,
toStartOfMinute(start_time) AS bucket, toStartOfMinute(start_time) AS bucket,
environment,
countState() AS total_count, countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'FAILED') AS failed_count,
countIfState(status = 'RUNNING') AS running_count, countIfState(status = 'RUNNING') AS running_count,
@@ -216,7 +225,7 @@ SELECT
maxState(duration_ms) AS duration_max, maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration quantileState(0.99)(duration_ms) AS p99_duration
FROM executions FROM executions
GROUP BY tenant_id, application_id, route_id, bucket; GROUP BY tenant_id, application_id, route_id, bucket, environment;
-- stats_1m_processor (per-processor-type) -- stats_1m_processor (per-processor-type)
@@ -225,6 +234,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor (
application_id LowCardinality(String), application_id LowCardinality(String),
processor_type LowCardinality(String), processor_type LowCardinality(String),
bucket DateTime, bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(count), total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8), failed_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)), duration_sum AggregateFunction(sum, Nullable(Int64)),
@@ -233,7 +243,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor (
) )
ENGINE = AggregatingMergeTree() ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket)) PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_id, processor_type, bucket) ORDER BY (tenant_id, bucket, environment, application_id, processor_type)
TTL bucket + INTERVAL 365 DAY DELETE; TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_mv TO stats_1m_processor AS
@@ -242,13 +252,14 @@ SELECT
application_id, application_id,
processor_type, processor_type,
toStartOfMinute(start_time) AS bucket, toStartOfMinute(start_time) AS bucket,
environment,
countState() AS total_count, countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum, sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max, maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions FROM processor_executions
GROUP BY tenant_id, application_id, processor_type, bucket; GROUP BY tenant_id, application_id, processor_type, bucket, environment;
-- stats_1m_processor_detail (per-processor-id) -- stats_1m_processor_detail (per-processor-id)
@@ -259,6 +270,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
processor_id String, processor_id String,
processor_type LowCardinality(String), processor_type LowCardinality(String),
bucket DateTime, bucket DateTime,
environment LowCardinality(String) DEFAULT 'default',
total_count AggregateFunction(count), total_count AggregateFunction(count),
failed_count AggregateFunction(countIf, UInt8), failed_count AggregateFunction(countIf, UInt8),
duration_sum AggregateFunction(sum, Nullable(Int64)), duration_sum AggregateFunction(sum, Nullable(Int64)),
@@ -267,7 +279,7 @@ CREATE TABLE IF NOT EXISTS stats_1m_processor_detail (
) )
ENGINE = AggregatingMergeTree() ENGINE = AggregatingMergeTree()
PARTITION BY (tenant_id, toYYYYMM(bucket)) PARTITION BY (tenant_id, toYYYYMM(bucket))
ORDER BY (tenant_id, application_id, route_id, processor_id, processor_type, bucket) ORDER BY (tenant_id, bucket, environment, application_id, route_id, processor_id, processor_type)
TTL bucket + INTERVAL 365 DAY DELETE; TTL bucket + INTERVAL 365 DAY DELETE;
CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS CREATE MATERIALIZED VIEW IF NOT EXISTS stats_1m_processor_detail_mv TO stats_1m_processor_detail AS
@@ -278,13 +290,14 @@ SELECT
processor_id, processor_id,
processor_type, processor_type,
toStartOfMinute(start_time) AS bucket, toStartOfMinute(start_time) AS bucket,
environment,
countState() AS total_count, countState() AS total_count,
countIfState(status = 'FAILED') AS failed_count, countIfState(status = 'FAILED') AS failed_count,
sumState(duration_ms) AS duration_sum, sumState(duration_ms) AS duration_sum,
maxState(duration_ms) AS duration_max, maxState(duration_ms) AS duration_max,
quantileState(0.99)(duration_ms) AS p99_duration quantileState(0.99)(duration_ms) AS p99_duration
FROM processor_executions FROM processor_executions
GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket; GROUP BY tenant_id, application_id, route_id, processor_id, processor_type, bucket, environment;
-- ── Route Diagrams ────────────────────────────────────────────────────── -- ── Route Diagrams ──────────────────────────────────────────────────────
@@ -294,11 +307,12 @@ CREATE TABLE IF NOT EXISTS route_diagrams (
route_id LowCardinality(String), route_id LowCardinality(String),
instance_id LowCardinality(String), instance_id LowCardinality(String),
application_id LowCardinality(String), application_id LowCardinality(String),
environment LowCardinality(String) DEFAULT 'default',
definition String, definition String,
created_at DateTime64(3) DEFAULT now64(3) created_at DateTime64(3) DEFAULT now64(3)
) )
ENGINE = ReplacingMergeTree(created_at) ENGINE = ReplacingMergeTree(created_at)
ORDER BY (tenant_id, content_hash) ORDER BY (tenant_id, environment, route_id, instance_id, content_hash)
SETTINGS index_granularity = 8192; SETTINGS index_granularity = 8192;
-- ── Agent Events ──────────────────────────────────────────────────────── -- ── Agent Events ────────────────────────────────────────────────────────
@@ -306,6 +320,7 @@ SETTINGS index_granularity = 8192;
CREATE TABLE IF NOT EXISTS agent_events ( CREATE TABLE IF NOT EXISTS agent_events (
tenant_id LowCardinality(String) DEFAULT 'default', tenant_id LowCardinality(String) DEFAULT 'default',
timestamp DateTime64(3) DEFAULT now64(3), timestamp DateTime64(3) DEFAULT now64(3),
environment LowCardinality(String) DEFAULT 'default',
instance_id LowCardinality(String), instance_id LowCardinality(String),
application_id LowCardinality(String), application_id LowCardinality(String),
event_type LowCardinality(String), event_type LowCardinality(String),
@@ -313,7 +328,7 @@ CREATE TABLE IF NOT EXISTS agent_events (
) )
ENGINE = MergeTree() ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp)) PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, application_id, instance_id, timestamp) ORDER BY (tenant_id, timestamp, environment, instance_id)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE; TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE;
-- ── Logs ──────────────────────────────────────────────────────────────── -- ── Logs ────────────────────────────────────────────────────────────────
@@ -321,6 +336,7 @@ TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE;
CREATE TABLE IF NOT EXISTS logs ( CREATE TABLE IF NOT EXISTS logs (
tenant_id LowCardinality(String) DEFAULT 'default', tenant_id LowCardinality(String) DEFAULT 'default',
timestamp DateTime64(3), timestamp DateTime64(3),
environment LowCardinality(String) DEFAULT 'default',
application LowCardinality(String), application LowCardinality(String),
instance_id LowCardinality(String), instance_id LowCardinality(String),
level LowCardinality(String), level LowCardinality(String),
@@ -337,14 +353,16 @@ CREATE TABLE IF NOT EXISTS logs (
) )
ENGINE = MergeTree() ENGINE = MergeTree()
PARTITION BY (tenant_id, toYYYYMM(timestamp)) PARTITION BY (tenant_id, toYYYYMM(timestamp))
ORDER BY (tenant_id, application, timestamp) ORDER BY (tenant_id, timestamp, environment, application, instance_id)
TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE TTL toDateTime(timestamp) + INTERVAL 365 DAY DELETE
SETTINGS index_granularity = 8192; SETTINGS index_granularity = 8192;
-- ── Usage Events ──────────────────────────────────────────────────────── -- ── Usage Events ────────────────────────────────────────────────────────
CREATE TABLE IF NOT EXISTS usage_events ( CREATE TABLE IF NOT EXISTS usage_events (
tenant_id LowCardinality(String) DEFAULT 'default',
timestamp DateTime64(3) DEFAULT now64(3), timestamp DateTime64(3) DEFAULT now64(3),
environment LowCardinality(String) DEFAULT 'default',
username LowCardinality(String), username LowCardinality(String),
method LowCardinality(String), method LowCardinality(String),
path String, path String,
@@ -354,5 +372,5 @@ CREATE TABLE IF NOT EXISTS usage_events (
query_params String DEFAULT '' query_params String DEFAULT ''
) )
ENGINE = MergeTree() ENGINE = MergeTree()
ORDER BY (username, timestamp) ORDER BY (tenant_id, timestamp, environment, username, normalized)
TTL toDateTime(timestamp) + INTERVAL 90 DAY; TTL toDateTime(timestamp) + INTERVAL 90 DAY;

View File

@@ -30,7 +30,7 @@ public class TestSecurityHelper {
* Registers a test agent and returns a valid JWT access token with AGENT role. * Registers a test agent and returns a valid JWT access token with AGENT role.
*/ */
public String registerTestAgent(String instanceId) { public String registerTestAgent(String instanceId) {
agentRegistryService.register(instanceId, "test", "test-group", "1.0", List.of(), Map.of()); agentRegistryService.register(instanceId, "test", "test-group", "default", "1.0", List.of(), Map.of());
return jwtService.createAccessToken(instanceId, "test-group", List.of("AGENT")); return jwtService.createAccessToken(instanceId, "test-group", List.of("AGENT"));
} }

View File

@@ -42,7 +42,7 @@ class ClickHouseLogStoreIT {
ClickHouseTestHelper.executeInitSql(jdbc); ClickHouseTestHelper.executeInitSql(jdbc);
jdbc.execute("TRUNCATE TABLE logs"); jdbc.execute("TRUNCATE TABLE logs");
store = new ClickHouseLogStore(jdbc); store = new ClickHouseLogStore("default", jdbc);
} }
// ── Helpers ────────────────────────────────────────────────────────── // ── Helpers ──────────────────────────────────────────────────────────

View File

@@ -47,15 +47,15 @@ class ClickHouseSearchIndexIT {
jdbc.execute("TRUNCATE TABLE executions"); jdbc.execute("TRUNCATE TABLE executions");
jdbc.execute("TRUNCATE TABLE processor_executions"); jdbc.execute("TRUNCATE TABLE processor_executions");
ClickHouseExecutionStore store = new ClickHouseExecutionStore(jdbc); ClickHouseExecutionStore store = new ClickHouseExecutionStore("default", jdbc);
searchIndex = new ClickHouseSearchIndex(jdbc); searchIndex = new ClickHouseSearchIndex("default", jdbc);
// Seed test data // Seed test data
Instant baseTime = Instant.parse("2026-03-31T10:00:00Z"); Instant baseTime = Instant.parse("2026-03-31T10:00:00Z");
// exec-1: COMPLETED, route-timer, agent-a, my-app, corr-1, 500ms, input_body with order number, attributes // exec-1: COMPLETED, route-timer, agent-a, my-app, corr-1, 500ms, input_body with order number, attributes
MergedExecution exec1 = new MergedExecution( MergedExecution exec1 = new MergedExecution(
"default", 1L, "exec-1", "route-timer", "agent-a", "my-app", "default", 1L, "exec-1", "route-timer", "agent-a", "my-app", "default",
"COMPLETED", "corr-1", "exchange-1", "COMPLETED", "corr-1", "exchange-1",
baseTime, baseTime,
baseTime.plusMillis(500), baseTime.plusMillis(500),
@@ -70,7 +70,7 @@ class ClickHouseSearchIndexIT {
// exec-2: FAILED, route-timer, agent-a, my-app, corr-2, 200ms, with error // exec-2: FAILED, route-timer, agent-a, my-app, corr-2, 200ms, with error
MergedExecution exec2 = new MergedExecution( MergedExecution exec2 = new MergedExecution(
"default", 1L, "exec-2", "route-timer", "agent-a", "my-app", "default", 1L, "exec-2", "route-timer", "agent-a", "my-app", "default",
"FAILED", "corr-2", "exchange-2", "FAILED", "corr-2", "exchange-2",
baseTime.plusSeconds(1), baseTime.plusSeconds(1),
baseTime.plusSeconds(1).plusMillis(200), baseTime.plusSeconds(1).plusMillis(200),
@@ -87,7 +87,7 @@ class ClickHouseSearchIndexIT {
// exec-3: COMPLETED, route-rest, agent-b, other-app, 100ms, no error // exec-3: COMPLETED, route-rest, agent-b, other-app, 100ms, no error
MergedExecution exec3 = new MergedExecution( MergedExecution exec3 = new MergedExecution(
"default", 1L, "exec-3", "route-rest", "agent-b", "other-app", "default", 1L, "exec-3", "route-rest", "agent-b", "other-app", "default",
"COMPLETED", "", "exchange-3", "COMPLETED", "", "exchange-3",
baseTime.plusSeconds(2), baseTime.plusSeconds(2),
baseTime.plusSeconds(2).plusMillis(100), baseTime.plusSeconds(2).plusMillis(100),

View File

@@ -39,7 +39,7 @@ class ClickHouseAgentEventRepositoryIT {
ClickHouseTestHelper.executeInitSql(jdbc); ClickHouseTestHelper.executeInitSql(jdbc);
jdbc.execute("TRUNCATE TABLE agent_events"); jdbc.execute("TRUNCATE TABLE agent_events");
repo = new ClickHouseAgentEventRepository(jdbc); repo = new ClickHouseAgentEventRepository("default", jdbc);
} }
// ── Helpers ────────────────────────────────────────────────────────────── // ── Helpers ──────────────────────────────────────────────────────────────

View File

@@ -59,13 +59,14 @@ class ClickHouseChunkPipelineIT {
jdbc.execute("TRUNCATE TABLE executions"); jdbc.execute("TRUNCATE TABLE executions");
jdbc.execute("TRUNCATE TABLE processor_executions"); jdbc.execute("TRUNCATE TABLE processor_executions");
executionStore = new ClickHouseExecutionStore(jdbc); executionStore = new ClickHouseExecutionStore("default", jdbc);
searchIndex = new ClickHouseSearchIndex(jdbc); searchIndex = new ClickHouseSearchIndex("default", jdbc);
executionBuffer = new ArrayList<>(); executionBuffer = new ArrayList<>();
processorBuffer = new ArrayList<>(); processorBuffer = new ArrayList<>();
DiagramStore noOpDiagramStore = org.mockito.Mockito.mock(DiagramStore.class); DiagramStore noOpDiagramStore = org.mockito.Mockito.mock(DiagramStore.class);
accumulator = new ChunkAccumulator(executionBuffer::add, processorBuffer::add, noOpDiagramStore, Duration.ofMinutes(5)); accumulator = new ChunkAccumulator("default", executionBuffer::add, processorBuffer::add,
noOpDiagramStore, Duration.ofMinutes(5), id -> "default");
} }
@Test @Test

View File

@@ -41,7 +41,7 @@ class ClickHouseDiagramStoreIT {
ClickHouseTestHelper.executeInitSql(jdbc); ClickHouseTestHelper.executeInitSql(jdbc);
jdbc.execute("TRUNCATE TABLE route_diagrams"); jdbc.execute("TRUNCATE TABLE route_diagrams");
store = new ClickHouseDiagramStore(jdbc); store = new ClickHouseDiagramStore("default", jdbc);
} }
// ── Helpers ────────────────────────────────────────────────────────── // ── Helpers ──────────────────────────────────────────────────────────

View File

@@ -47,7 +47,7 @@ class ClickHouseExecutionReadIT {
jdbc.execute("TRUNCATE TABLE executions"); jdbc.execute("TRUNCATE TABLE executions");
jdbc.execute("TRUNCATE TABLE processor_executions"); jdbc.execute("TRUNCATE TABLE processor_executions");
store = new ClickHouseExecutionStore(jdbc); store = new ClickHouseExecutionStore("default", jdbc);
detailService = new DetailService(store); detailService = new DetailService(store);
} }
@@ -55,7 +55,7 @@ class ClickHouseExecutionReadIT {
private MergedExecution minimalExecution(String executionId) { private MergedExecution minimalExecution(String executionId) {
return new MergedExecution( return new MergedExecution(
"default", 1L, executionId, "route-a", "agent-1", "my-app", "default", 1L, executionId, "route-a", "agent-1", "my-app", "default",
"COMPLETED", "corr-1", "exchange-1", "COMPLETED", "corr-1", "exchange-1",
Instant.parse("2026-04-01T10:00:00Z"), Instant.parse("2026-04-01T10:00:00Z"),
Instant.parse("2026-04-01T10:00:01Z"), Instant.parse("2026-04-01T10:00:01Z"),

View File

@@ -43,13 +43,13 @@ class ClickHouseExecutionStoreIT {
jdbc.execute("TRUNCATE TABLE executions"); jdbc.execute("TRUNCATE TABLE executions");
jdbc.execute("TRUNCATE TABLE processor_executions"); jdbc.execute("TRUNCATE TABLE processor_executions");
store = new ClickHouseExecutionStore(jdbc); store = new ClickHouseExecutionStore("default", jdbc);
} }
@Test @Test
void insertExecutionBatch_writesToClickHouse() { void insertExecutionBatch_writesToClickHouse() {
MergedExecution exec = new MergedExecution( MergedExecution exec = new MergedExecution(
"default", 1L, "exec-1", "route-a", "agent-1", "my-app", "default", 1L, "exec-1", "route-a", "agent-1", "my-app", "default",
"COMPLETED", "corr-1", "exchange-1", "COMPLETED", "corr-1", "exchange-1",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:01Z"), Instant.parse("2026-03-31T10:00:01Z"),
@@ -181,7 +181,7 @@ class ClickHouseExecutionStoreIT {
@Test @Test
void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() { void insertExecutionBatch_replacingMergeTree_keepsLatestVersion() {
MergedExecution v1 = new MergedExecution( MergedExecution v1 = new MergedExecution(
"default", 1L, "exec-r", "route-a", "agent-1", "my-app", "default", 1L, "exec-r", "route-a", "agent-1", "my-app", "default",
"RUNNING", "corr-1", "exchange-1", "RUNNING", "corr-1", "exchange-1",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),
null, null, null, null,
@@ -194,7 +194,7 @@ class ClickHouseExecutionStoreIT {
); );
MergedExecution v2 = new MergedExecution( MergedExecution v2 = new MergedExecution(
"default", 2L, "exec-r", "route-a", "agent-1", "my-app", "default", 2L, "exec-r", "route-a", "agent-1", "my-app", "default",
"COMPLETED", "corr-1", "exchange-1", "COMPLETED", "corr-1", "exchange-1",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),
Instant.parse("2026-03-31T10:00:05Z"), Instant.parse("2026-03-31T10:00:05Z"),

View File

@@ -60,7 +60,7 @@ class ClickHouseMetricsQueryStoreIT {
"agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts)); "agent-1", "memory.free", 1000.0 - i * 100, java.sql.Timestamp.from(ts));
} }
queryStore = new ClickHouseMetricsQueryStore(jdbc); queryStore = new ClickHouseMetricsQueryStore("default", jdbc);
} }
@Test @Test

View File

@@ -51,7 +51,7 @@ class ClickHouseMetricsStoreIT {
jdbc.execute("TRUNCATE TABLE agent_metrics"); jdbc.execute("TRUNCATE TABLE agent_metrics");
store = new ClickHouseMetricsStore(jdbc); store = new ClickHouseMetricsStore("default", jdbc);
} }
@Test @Test

View File

@@ -75,7 +75,7 @@ class ClickHouseStatsStoreIT {
System.out.println("LOG: " + entry.get("type") + " | " + entry.get("q")); System.out.println("LOG: " + entry.get("type") + " | " + entry.get("q"));
} }
store = new ClickHouseStatsStore(jdbc); store = new ClickHouseStatsStore("default", jdbc);
} }
private void seedTestData() { private void seedTestData() {

View File

@@ -14,6 +14,7 @@ import java.util.Map;
* @param instanceId agent-provided persistent identifier * @param instanceId agent-provided persistent identifier
* @param displayName human-readable agent name * @param displayName human-readable agent name
* @param applicationId application identifier (e.g., "order-service-prod") * @param applicationId application identifier (e.g., "order-service-prod")
* @param environmentId logical environment (e.g., "dev", "staging", "prod")
* @param version agent software version * @param version agent software version
* @param routeIds list of Camel route IDs managed by this agent * @param routeIds list of Camel route IDs managed by this agent
* @param capabilities agent-declared capabilities (free-form) * @param capabilities agent-declared capabilities (free-form)
@@ -26,6 +27,7 @@ public record AgentInfo(
String instanceId, String instanceId,
String displayName, String displayName,
String applicationId, String applicationId,
String environmentId,
String version, String version,
List<String> routeIds, List<String> routeIds,
Map<String, Object> capabilities, Map<String, Object> capabilities,
@@ -36,33 +38,33 @@ public record AgentInfo(
) { ) {
public AgentInfo withState(AgentState newState) { public AgentInfo withState(AgentState newState) {
return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities, return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, capabilities,
newState, registeredAt, lastHeartbeat, staleTransitionTime); newState, registeredAt, lastHeartbeat, staleTransitionTime);
} }
public AgentInfo withLastHeartbeat(Instant newLastHeartbeat) { public AgentInfo withLastHeartbeat(Instant newLastHeartbeat) {
return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities, return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, capabilities,
state, registeredAt, newLastHeartbeat, staleTransitionTime); state, registeredAt, newLastHeartbeat, staleTransitionTime);
} }
public AgentInfo withRegisteredAt(Instant newRegisteredAt) { public AgentInfo withRegisteredAt(Instant newRegisteredAt) {
return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities, return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, capabilities,
state, newRegisteredAt, lastHeartbeat, staleTransitionTime); state, newRegisteredAt, lastHeartbeat, staleTransitionTime);
} }
public AgentInfo withStaleTransitionTime(Instant newStaleTransitionTime) { public AgentInfo withStaleTransitionTime(Instant newStaleTransitionTime) {
return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities, return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, capabilities,
state, registeredAt, lastHeartbeat, newStaleTransitionTime); state, registeredAt, lastHeartbeat, newStaleTransitionTime);
} }
public AgentInfo withCapabilities(Map<String, Object> newCapabilities) { public AgentInfo withCapabilities(Map<String, Object> newCapabilities) {
return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, newCapabilities, return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, newCapabilities,
state, registeredAt, lastHeartbeat, staleTransitionTime); state, registeredAt, lastHeartbeat, staleTransitionTime);
} }
public AgentInfo withMetadata(String displayName, String applicationId, String version, public AgentInfo withMetadata(String displayName, String applicationId, String environmentId,
List<String> routeIds, Map<String, Object> capabilities) { String version, List<String> routeIds, Map<String, Object> capabilities) {
return new AgentInfo(instanceId, displayName, applicationId, version, routeIds, capabilities, return new AgentInfo(instanceId, displayName, applicationId, environmentId, version, routeIds, capabilities,
state, registeredAt, lastHeartbeat, staleTransitionTime); state, registeredAt, lastHeartbeat, staleTransitionTime);
} }
} }

View File

@@ -46,10 +46,10 @@ public class AgentRegistryService {
* Register a new agent or re-register an existing one. * Register a new agent or re-register an existing one.
* Re-registration updates metadata, transitions state to LIVE, and resets timestamps. * Re-registration updates metadata, transitions state to LIVE, and resets timestamps.
*/ */
public AgentInfo register(String id, String name, String application, String version, public AgentInfo register(String id, String name, String application, String environmentId,
List<String> routeIds, Map<String, Object> capabilities) { String version, List<String> routeIds, Map<String, Object> capabilities) {
Instant now = Instant.now(); Instant now = Instant.now();
AgentInfo newAgent = new AgentInfo(id, name, application, version, AgentInfo newAgent = new AgentInfo(id, name, application, environmentId, version,
List.copyOf(routeIds), Map.copyOf(capabilities), List.copyOf(routeIds), Map.copyOf(capabilities),
AgentState.LIVE, now, now, null); AgentState.LIVE, now, now, null);
@@ -58,13 +58,13 @@ public class AgentRegistryService {
// Re-registration: update metadata, reset to LIVE // Re-registration: update metadata, reset to LIVE
log.info("Agent {} re-registering (was {})", id, existing.state()); log.info("Agent {} re-registering (was {})", id, existing.state());
return existing return existing
.withMetadata(name, application, version, List.copyOf(routeIds), Map.copyOf(capabilities)) .withMetadata(name, application, environmentId, version, List.copyOf(routeIds), Map.copyOf(capabilities))
.withState(AgentState.LIVE) .withState(AgentState.LIVE)
.withLastHeartbeat(now) .withLastHeartbeat(now)
.withRegisteredAt(now) .withRegisteredAt(now)
.withStaleTransitionTime(null); .withStaleTransitionTime(null);
} }
log.info("Agent {} registered (name={}, application={})", id, name, application); log.info("Agent {} registered (name={}, application={}, env={})", id, name, application, environmentId);
return newAgent; return newAgent;
}); });
} }

View File

@@ -6,6 +6,8 @@ import com.cameleer3.common.model.LogEntry;
* A log entry paired with its agent metadata, ready for buffered ClickHouse insertion. * A log entry paired with its agent metadata, ready for buffered ClickHouse insertion.
*/ */
public record BufferedLogEntry( public record BufferedLogEntry(
String tenantId,
String environment,
String instanceId, String instanceId,
String applicationId, String applicationId,
LogEntry entry LogEntry entry

View File

@@ -15,6 +15,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function;
/** /**
* Accumulates {@link ExecutionChunk} documents and produces: * Accumulates {@link ExecutionChunk} documents and produces:
@@ -26,23 +27,28 @@ import java.util.function.Consumer;
public class ChunkAccumulator { public class ChunkAccumulator {
private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class); private static final Logger log = LoggerFactory.getLogger(ChunkAccumulator.class);
private static final String DEFAULT_TENANT = "default";
private static final ObjectMapper MAPPER = new ObjectMapper(); private static final ObjectMapper MAPPER = new ObjectMapper();
private final String tenantId;
private final Consumer<MergedExecution> executionSink; private final Consumer<MergedExecution> executionSink;
private final Consumer<ProcessorBatch> processorSink; private final Consumer<ProcessorBatch> processorSink;
private final DiagramStore diagramStore; private final DiagramStore diagramStore;
private final Duration staleThreshold; private final Duration staleThreshold;
private final Function<String, String> environmentResolver;
private final ConcurrentHashMap<String, PendingExchange> pending = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, PendingExchange> pending = new ConcurrentHashMap<>();
public ChunkAccumulator(Consumer<MergedExecution> executionSink, public ChunkAccumulator(String tenantId,
Consumer<MergedExecution> executionSink,
Consumer<ProcessorBatch> processorSink, Consumer<ProcessorBatch> processorSink,
DiagramStore diagramStore, DiagramStore diagramStore,
Duration staleThreshold) { Duration staleThreshold,
Function<String, String> environmentResolver) {
this.tenantId = tenantId;
this.executionSink = executionSink; this.executionSink = executionSink;
this.processorSink = processorSink; this.processorSink = processorSink;
this.diagramStore = diagramStore; this.diagramStore = diagramStore;
this.staleThreshold = staleThreshold; this.staleThreshold = staleThreshold;
this.environmentResolver = environmentResolver;
} }
/** /**
@@ -51,13 +57,16 @@ public class ChunkAccumulator {
*/ */
public void onChunk(ExecutionChunk chunk) { public void onChunk(ExecutionChunk chunk) {
// 1. Push processor records immediately (append-only) // 1. Push processor records immediately (append-only)
String environment = environmentResolver.apply(
chunk.getInstanceId() != null ? chunk.getInstanceId() : "");
boolean chunkHasTrace = false; boolean chunkHasTrace = false;
if (chunk.getProcessors() != null && !chunk.getProcessors().isEmpty()) { if (chunk.getProcessors() != null && !chunk.getProcessors().isEmpty()) {
processorSink.accept(new ProcessorBatch( processorSink.accept(new ProcessorBatch(
DEFAULT_TENANT, this.tenantId,
chunk.getExchangeId(), chunk.getExchangeId(),
chunk.getRouteId(), chunk.getRouteId(),
chunk.getApplicationId(), chunk.getApplicationId(),
environment,
chunk.getStartTime(), chunk.getStartTime(),
chunk.getProcessors())); chunk.getProcessors()));
chunkHasTrace = chunk.getProcessors().stream() chunkHasTrace = chunk.getProcessors().stream()
@@ -164,13 +173,16 @@ public class ChunkAccumulator {
} catch (Exception e) { } catch (Exception e) {
log.debug("Could not resolve diagram hash for route={}", envelope.getRouteId()); log.debug("Could not resolve diagram hash for route={}", envelope.getRouteId());
} }
String env = environmentResolver.apply(
envelope.getInstanceId() != null ? envelope.getInstanceId() : "");
return new MergedExecution( return new MergedExecution(
DEFAULT_TENANT, this.tenantId,
1L, 1L,
envelope.getExchangeId(), envelope.getExchangeId(),
envelope.getRouteId(), envelope.getRouteId(),
envelope.getInstanceId(), envelope.getInstanceId(),
envelope.getApplicationId(), envelope.getApplicationId(),
env,
envelope.getStatus() != null ? envelope.getStatus().name() : "RUNNING", envelope.getStatus() != null ? envelope.getStatus().name() : "RUNNING",
envelope.getCorrelationId(), envelope.getCorrelationId(),
envelope.getExchangeId(), envelope.getExchangeId(),
@@ -236,6 +248,7 @@ public class ChunkAccumulator {
String executionId, String executionId,
String routeId, String routeId,
String applicationId, String applicationId,
String environment,
Instant execStartTime, Instant execStartTime,
List<FlatProcessorRecord> processors List<FlatProcessorRecord> processors
) {} ) {}

View File

@@ -13,6 +13,7 @@ public record MergedExecution(
String routeId, String routeId,
String instanceId, String instanceId,
String applicationId, String applicationId,
String environment,
String status, String status,
String correlationId, String correlationId,
String exchangeId, String exchangeId,

View File

@@ -26,7 +26,7 @@ class AgentRegistryServiceTest {
@Test @Test
void registerNewAgent_createsWithLiveState() { void registerNewAgent_createsWithLiveState() {
AgentInfo agent = registry.register("agent-1", "Order Agent", "order-svc", AgentInfo agent = registry.register("agent-1", "Order Agent", "order-svc", "default",
"1.0.0", List.of("route1", "route2"), Map.of("feature", "tracing")); "1.0.0", List.of("route1", "route2"), Map.of("feature", "tracing"));
assertThat(agent).isNotNull(); assertThat(agent).isNotNull();
@@ -44,10 +44,10 @@ class AgentRegistryServiceTest {
@Test @Test
void reRegisterSameId_updatesMetadataAndTransitionsToLive() { void reRegisterSameId_updatesMetadataAndTransitionsToLive() {
registry.register("agent-1", "Old Name", "old-group", registry.register("agent-1", "Old Name", "old-group", "default",
"1.0.0", List.of("route1"), Map.of()); "1.0.0", List.of("route1"), Map.of());
AgentInfo updated = registry.register("agent-1", "New Name", "new-group", AgentInfo updated = registry.register("agent-1", "New Name", "new-group", "default",
"2.0.0", List.of("route1", "route2"), Map.of("new", "cap")); "2.0.0", List.of("route1", "route2"), Map.of("new", "cap"));
assertThat(updated.instanceId()).isEqualTo("agent-1"); assertThat(updated.instanceId()).isEqualTo("agent-1");
@@ -62,11 +62,11 @@ class AgentRegistryServiceTest {
@Test @Test
void reRegisterSameId_updatesRegisteredAtAndLastHeartbeat() { void reRegisterSameId_updatesRegisteredAtAndLastHeartbeat() {
AgentInfo first = registry.register("agent-1", "Name", "group", AgentInfo first = registry.register("agent-1", "Name", "group", "default",
"1.0.0", List.of(), Map.of()); "1.0.0", List.of(), Map.of());
Instant firstRegisteredAt = first.registeredAt(); Instant firstRegisteredAt = first.registeredAt();
AgentInfo second = registry.register("agent-1", "Name", "group", AgentInfo second = registry.register("agent-1", "Name", "group", "default",
"1.0.0", List.of(), Map.of()); "1.0.0", List.of(), Map.of());
assertThat(second.registeredAt()).isAfterOrEqualTo(firstRegisteredAt); assertThat(second.registeredAt()).isAfterOrEqualTo(firstRegisteredAt);
@@ -79,7 +79,7 @@ class AgentRegistryServiceTest {
@Test @Test
void heartbeatKnownAgent_returnsTrue() { void heartbeatKnownAgent_returnsTrue() {
registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); registry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of());
boolean result = registry.heartbeat("agent-1"); boolean result = registry.heartbeat("agent-1");
@@ -88,7 +88,7 @@ class AgentRegistryServiceTest {
@Test @Test
void heartbeatKnownAgent_updatesLastHeartbeat() { void heartbeatKnownAgent_updatesLastHeartbeat() {
registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); registry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of());
Instant before = registry.findById("agent-1").lastHeartbeat(); Instant before = registry.findById("agent-1").lastHeartbeat();
registry.heartbeat("agent-1"); registry.heartbeat("agent-1");
@@ -106,7 +106,7 @@ class AgentRegistryServiceTest {
@Test @Test
void heartbeatStaleAgent_transitionsToLive() { void heartbeatStaleAgent_transitionsToLive() {
registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); registry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of());
registry.transitionState("agent-1", AgentState.STALE); registry.transitionState("agent-1", AgentState.STALE);
assertThat(registry.findById("agent-1").state()).isEqualTo(AgentState.STALE); assertThat(registry.findById("agent-1").state()).isEqualTo(AgentState.STALE);
@@ -125,7 +125,7 @@ class AgentRegistryServiceTest {
void liveAgentBeyondStaleThreshold_transitionsToStale() { void liveAgentBeyondStaleThreshold_transitionsToStale() {
// Use very short thresholds for test // Use very short thresholds for test
AgentRegistryService shortRegistry = new AgentRegistryService(1, 300_000, 60_000); AgentRegistryService shortRegistry = new AgentRegistryService(1, 300_000, 60_000);
shortRegistry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); shortRegistry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of());
// Wait briefly to exceed 1ms threshold // Wait briefly to exceed 1ms threshold
try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
@@ -141,7 +141,7 @@ class AgentRegistryServiceTest {
void staleAgentBeyondDeadThreshold_transitionsToDead() { void staleAgentBeyondDeadThreshold_transitionsToDead() {
// Use very short thresholds for test: 1ms stale, 1ms dead // Use very short thresholds for test: 1ms stale, 1ms dead
AgentRegistryService shortRegistry = new AgentRegistryService(1, 1, 60_000); AgentRegistryService shortRegistry = new AgentRegistryService(1, 1, 60_000);
shortRegistry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); shortRegistry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of());
try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
shortRegistry.checkLifecycle(); // LIVE -> STALE shortRegistry.checkLifecycle(); // LIVE -> STALE
@@ -155,7 +155,7 @@ class AgentRegistryServiceTest {
@Test @Test
void deadAgentRemainsDead() { void deadAgentRemainsDead() {
AgentRegistryService shortRegistry = new AgentRegistryService(1, 1, 60_000); AgentRegistryService shortRegistry = new AgentRegistryService(1, 1, 60_000);
shortRegistry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); shortRegistry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of());
try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
shortRegistry.checkLifecycle(); shortRegistry.checkLifecycle();
@@ -171,7 +171,7 @@ class AgentRegistryServiceTest {
@Test @Test
void transitionState_setsStaleTransitionTimeWhenGoingStale() { void transitionState_setsStaleTransitionTimeWhenGoingStale() {
registry.register("agent-1", "Name", "group", "1.0.0", List.of(), Map.of()); registry.register("agent-1", "Name", "group", "default", "1.0.0", List.of(), Map.of());
registry.transitionState("agent-1", AgentState.STALE); registry.transitionState("agent-1", AgentState.STALE);
@@ -186,8 +186,8 @@ class AgentRegistryServiceTest {
@Test @Test
void findAll_returnsAllAgents() { void findAll_returnsAllAgents() {
registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of());
registry.register("agent-2", "A2", "g", "1.0", List.of(), Map.of()); registry.register("agent-2", "A2", "g", "default", "1.0", List.of(), Map.of());
List<AgentInfo> all = registry.findAll(); List<AgentInfo> all = registry.findAll();
@@ -197,8 +197,8 @@ class AgentRegistryServiceTest {
@Test @Test
void findByState_filtersCorrectly() { void findByState_filtersCorrectly() {
registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of());
registry.register("agent-2", "A2", "g", "1.0", List.of(), Map.of()); registry.register("agent-2", "A2", "g", "default", "1.0", List.of(), Map.of());
registry.transitionState("agent-2", AgentState.STALE); registry.transitionState("agent-2", AgentState.STALE);
List<AgentInfo> live = registry.findByState(AgentState.LIVE); List<AgentInfo> live = registry.findByState(AgentState.LIVE);
@@ -217,7 +217,7 @@ class AgentRegistryServiceTest {
@Test @Test
void findById_knownReturnsAgent() { void findById_knownReturnsAgent() {
registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of());
AgentInfo result = registry.findById("agent-1"); AgentInfo result = registry.findById("agent-1");
@@ -231,7 +231,7 @@ class AgentRegistryServiceTest {
@Test @Test
void addCommand_createsPendingCommand() { void addCommand_createsPendingCommand() {
registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of());
AgentCommand cmd = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{\"key\":\"val\"}"); AgentCommand cmd = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{\"key\":\"val\"}");
@@ -246,7 +246,7 @@ class AgentRegistryServiceTest {
@Test @Test
void addCommand_notifiesEventListener() { void addCommand_notifiesEventListener() {
registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of());
AtomicReference<AgentCommand> received = new AtomicReference<>(); AtomicReference<AgentCommand> received = new AtomicReference<>();
registry.setEventListener((agentId, command) -> received.set(command)); registry.setEventListener((agentId, command) -> received.set(command));
@@ -259,7 +259,7 @@ class AgentRegistryServiceTest {
@Test @Test
void acknowledgeCommand_transitionsStatus() { void acknowledgeCommand_transitionsStatus() {
registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of());
AgentCommand cmd = registry.addCommand("agent-1", CommandType.REPLAY, "{}"); AgentCommand cmd = registry.addCommand("agent-1", CommandType.REPLAY, "{}");
boolean acked = registry.acknowledgeCommand("agent-1", cmd.id()); boolean acked = registry.acknowledgeCommand("agent-1", cmd.id());
@@ -269,7 +269,7 @@ class AgentRegistryServiceTest {
@Test @Test
void acknowledgeCommand_unknownReturnsFalse() { void acknowledgeCommand_unknownReturnsFalse() {
registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of());
boolean acked = registry.acknowledgeCommand("agent-1", "nonexistent-cmd"); boolean acked = registry.acknowledgeCommand("agent-1", "nonexistent-cmd");
@@ -278,7 +278,7 @@ class AgentRegistryServiceTest {
@Test @Test
void findPendingCommands_returnsOnlyPending() { void findPendingCommands_returnsOnlyPending() {
registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of());
AgentCommand cmd1 = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}"); AgentCommand cmd1 = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}");
AgentCommand cmd2 = registry.addCommand("agent-1", CommandType.DEEP_TRACE, "{}"); AgentCommand cmd2 = registry.addCommand("agent-1", CommandType.DEEP_TRACE, "{}");
registry.acknowledgeCommand("agent-1", cmd1.id()); registry.acknowledgeCommand("agent-1", cmd1.id());
@@ -291,7 +291,7 @@ class AgentRegistryServiceTest {
@Test @Test
void markDelivered_updatesStatus() { void markDelivered_updatesStatus() {
registry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); registry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of());
AgentCommand cmd = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}"); AgentCommand cmd = registry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}");
registry.markDelivered("agent-1", cmd.id()); registry.markDelivered("agent-1", cmd.id());
@@ -305,7 +305,7 @@ class AgentRegistryServiceTest {
void expireOldCommands_removesExpiredPendingCommands() { void expireOldCommands_removesExpiredPendingCommands() {
// Use 1ms expiry for test // Use 1ms expiry for test
AgentRegistryService shortRegistry = new AgentRegistryService(90_000, 300_000, 1); AgentRegistryService shortRegistry = new AgentRegistryService(90_000, 300_000, 1);
shortRegistry.register("agent-1", "A1", "g", "1.0", List.of(), Map.of()); shortRegistry.register("agent-1", "A1", "g", "default", "1.0", List.of(), Map.of());
shortRegistry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}"); shortRegistry.addCommand("agent-1", CommandType.CONFIG_UPDATE, "{}");
try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } try { Thread.sleep(5); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }

View File

@@ -35,7 +35,8 @@ class ChunkAccumulatorTest {
executionSink = new CopyOnWriteArrayList<>(); executionSink = new CopyOnWriteArrayList<>();
processorSink = new CopyOnWriteArrayList<>(); processorSink = new CopyOnWriteArrayList<>();
accumulator = new ChunkAccumulator( accumulator = new ChunkAccumulator(
executionSink::add, processorSink::add, NO_OP_DIAGRAM_STORE, Duration.ofMinutes(5)); "default", executionSink::add, processorSink::add,
NO_OP_DIAGRAM_STORE, Duration.ofMinutes(5), id -> "default");
} }
@Test @Test
@@ -119,7 +120,8 @@ class ChunkAccumulatorTest {
@Test @Test
void staleExchange_flushedBySweep() throws Exception { void staleExchange_flushedBySweep() throws Exception {
ChunkAccumulator staleAccumulator = new ChunkAccumulator( ChunkAccumulator staleAccumulator = new ChunkAccumulator(
executionSink::add, processorSink::add, NO_OP_DIAGRAM_STORE, Duration.ofMillis(1)); "default", executionSink::add, processorSink::add,
NO_OP_DIAGRAM_STORE, Duration.ofMillis(1), id -> "default");
ExecutionChunk c = chunk("ex-3", "RUNNING", ExecutionChunk c = chunk("ex-3", "RUNNING",
Instant.parse("2026-03-31T10:00:00Z"), Instant.parse("2026-03-31T10:00:00Z"),

View File

@@ -99,6 +99,8 @@ spec:
value: "clickhouse" value: "clickhouse"
- name: CAMELEER_STORAGE_EXECUTIONS - name: CAMELEER_STORAGE_EXECUTIONS
value: "clickhouse" value: "clickhouse"
- name: CAMELEER_TENANT_ID
value: "default"
resources: resources:
requests: requests: