Skip to content

[ISSUE #5259] Add A2A Gateway: REST API, SSE streaming, Task lifecycle, Java SDK and tests#5260

Merged
xwm1992 merged 8 commits into
apache:masterfrom
qqeasonchen:feat/a2a-gateway-enhancement
Jun 29, 2026
Merged

[ISSUE #5259] Add A2A Gateway: REST API, SSE streaming, Task lifecycle, Java SDK and tests#5260
xwm1992 merged 8 commits into
apache:masterfrom
qqeasonchen:feat/a2a-gateway-enhancement

Conversation

@qqeasonchen

@qqeasonchen qqeasonchen commented Jun 28, 2026

Copy link
Copy Markdown
Contributor

#5259

What's New

A2A Gateway Runtime (eventmesh-runtime)

  • A2AGatewayServer: standalone HTTP server entry point
  • A2AGatewayService: core gateway logic with task timeout, agent validation, cancel notification, and concurrent-safe status subscribers
  • A2AGatewayHttpHandler: Netty-based REST handler with CORS, SSE streaming, health check, task list pagination, and precise error codes
  • TaskRegistry: in-memory task lifecycle management with TTL cleanup, CopyOnWriteArrayList for parent-child concurrency safety
  • InMemoryA2AMessageTransport: default transport implementation
  • A2ACardHttpHandler: enhanced with CORS headers

A2A Protocol Plugin (eventmesh-protocol-a2a)

  • A2AClient: typed Java SDK with sync/async task send, SSE streaming (streamTaskStatus), agent discovery, heartbeat, and SSE-aware socket timeout
  • A2AMessageTransport: transport abstraction interface
  • A2ATopicFactory: A2A topic convention standardization
  • A2ATopicFactoryTest: unit tests

Examples

  • A2AGatewayDemo: end-to-end HTTP client demo

Tests (all passing)

  • A2AGatewayServiceTest: 15+ tests covering timeout, agent validation, cancel, SSE, pagination
  • TaskRegistryTest: TTL, concurrent child creation, lifecycle transitions
  • A2AClientServerIntegrationTest: full client-server integration with SSE streaming, CORS, health, heartbeat
  • A2AGatewayEndToEndTest: end-to-end pipeline verification
  • InMemoryA2AMessageTransportTest: transport unit tests

Documentation (7 files updated)

  • eventmesh-a2a-design.md, ARCHITECTURE.md, IMPLEMENTATION_SUMMARY.md, IMPLEMENTATION_SUMMARY_EN.md, TEST_RESULTS.md, README.md, README_EN.md

Key Engineering Decisions

  • Task timeout: auto-fail non-terminal tasks after configurable threshold
  • SSE: heartbeat mechanism + socketTimeout(0) for long-lived connections
  • Concurrency: CopyOnWriteArrayList + ConcurrentHashMap throughout
  • CORS: Access-Control-Allow-Origin on all A2A endpoints
  • Agent validation: reject tasks for unregistered agents

Fixes #issue_id

Motivation

Explain the content here.
Explain why you want to make the changes and what problem you're trying to solve.

Modifications

Describe the modifications you've done.

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a followup issue for adding the documentation

…cle, Java SDK and tests

## What's New

### A2A Gateway Runtime (eventmesh-runtime)
- A2AGatewayServer: standalone HTTP server entry point
- A2AGatewayService: core gateway logic with task timeout, agent validation,
  cancel notification, and concurrent-safe status subscribers
- A2AGatewayHttpHandler: Netty-based REST handler with CORS, SSE streaming,
  health check, task list pagination, and precise error codes
- TaskRegistry: in-memory task lifecycle management with TTL cleanup,
  CopyOnWriteArrayList for parent-child concurrency safety
- InMemoryA2AMessageTransport: default transport implementation
- A2ACardHttpHandler: enhanced with CORS headers

### A2A Protocol Plugin (eventmesh-protocol-a2a)
- A2AClient: typed Java SDK with sync/async task send, SSE streaming
  (streamTaskStatus), agent discovery, heartbeat, and SSE-aware socket timeout
- A2AMessageTransport: transport abstraction interface
- A2ATopicFactory: A2A topic convention standardization
- A2ATopicFactoryTest: unit tests

### Examples
- A2AGatewayDemo: end-to-end HTTP client demo

### Tests (all passing)
- A2AGatewayServiceTest: 15+ tests covering timeout, agent validation,
  cancel, SSE, pagination
- TaskRegistryTest: TTL, concurrent child creation, lifecycle transitions
- A2AClientServerIntegrationTest: full client-server integration with
  SSE streaming, CORS, health, heartbeat
- A2AGatewayEndToEndTest: end-to-end pipeline verification
- InMemoryA2AMessageTransportTest: transport unit tests

### Documentation (7 files updated)
- eventmesh-a2a-design.md, ARCHITECTURE.md, IMPLEMENTATION_SUMMARY.md,
  IMPLEMENTATION_SUMMARY_EN.md, TEST_RESULTS.md, README.md, README_EN.md

### Key Engineering Decisions
- Task timeout: auto-fail non-terminal tasks after configurable threshold
- SSE: heartbeat mechanism + socketTimeout(0) for long-lived connections
- Concurrency: CopyOnWriteArrayList + ConcurrentHashMap throughout
- CORS: Access-Control-Allow-Origin on all A2A endpoints
- Agent validation: reject tasks for unregistered agents
@qqeasonchen qqeasonchen changed the title [ISSUE #5259] Add A2A Gateway: REST API, SSE streaming, Task lifecy… [ISSUE #5259] Add A2A Gateway: REST API, SSE streaming, Task lifecycle, Java SDK and tests Jun 28, 2026
Replace '-->' arrow notation with HTML entity '&gt;' in Javadoc <pre> block
to fix javadoc compilation error: 'bad use of ">".

The '>' character inside <pre> blocks in Javadoc must be escaped as '&gt;'
when it appears after '-' characters (forming '-->' which Java parser
misinterprets as an HTML comment end marker).

Fixes: CodeQL Analyze (java) and all Build CI failures in PR apache#5260.
Replace '->' with '-&gt;' in Javadoc <pre> block to avoid
javadoc 'bad use of >' compilation error. The Java parser
interprets '>' in <pre> blocks as HTML tag closers.

Fixes: remaining CI build failure in PR apache#5260 after first fix.
checkstyleMain (4 files, 8 violations):
- Remove unused imports (ArrayList, Objects)
- Fix import ordering per project convention: static first, then
  org.apache.eventmesh, org.apache, java, io, org.junit, com
- Replace Unicode escape (\u00b0) with literal degree symbol
- Add missing Javadoc for StatusSubscriber interface

checkstyleTest (5 files, 9 violations):
- Move static imports to top (option=top in checkstyle config)
- Fix import group ordering (io before org per groups config)
- Remove unused import (assertNull)
- Fix VariableDeclarationUsageDistance by moving declarations
  closer to first usage

Fixes: third round of CI failures in PR apache#5260 after javadoc fixes.
checkstyle groups order: org.apache.eventmesh,org.apache,java,javax,org,io,net,junit,com,lombok

- Move org.junit.jupiter.* (org group) before io.cloudevents.* (io group) in test files
- Separate com.fasterxml and lombok into their own groups with blank lines in A2AGatewayService

Fixes: 4th round checkstyle violations in PR apache#5260
Increase TTL from 200ms to 500ms and cleanup interval from 100ms to 250ms
to prevent race condition on slow CI runners (macOS). Add assertion messages
for better debugging.

The test was flaky because Thread.sleep(100) could exceed 100ms on overloaded
CI runners, causing the cleanup to expire the card before heartbeat check.

Fixes: 5th round CI failure on macOS in PR apache#5260
- A2AMessageTransport.java: add Javadoc for MessageCallback interface
- A2AClient.java: remove unused java.util.function.Consumer import
- A2AClient.java: fix import order (org.apache.eventmesh, org.apache,
  java, org, io, com per checkstyle groups)
- A2AClient.java: add Javadoc for RequestHandler interface

Fixes: 6th round CI failure (protocol-a2a checkstyle) in PR apache#5260
- Move static imports to top (option=top in checkstyle)
- Fix alphabetical order: assertFalse before assertNotNull

Fixes: 7th round CI failure (protocol-a2a checkstyleTest) in PR apache#5260

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces an A2A (Agent-to-Agent) Gateway Runtime in eventmesh-runtime (Netty HTTP REST + SSE), adds an in-memory task lifecycle registry with TTL cleanup, and provides a typed Java SDK (A2AClient) plus topic conventions (A2ATopicFactory) in eventmesh-protocol-a2a. It also adds extensive unit/integration tests and updates A2A protocol documentation to cover the new gateway architecture.

Changes:

  • Add A2A Gateway server/runtime: REST endpoints, SSE streaming, task lifecycle + TTL cleanup, in-memory transport.
  • Add protocol-side utilities: topic factory + parsing, transport abstraction, Java SDK client (HTTP + SSE + heartbeat).
  • Add tests (unit + HTTP integration) and update docs/architecture/design summaries.

Reviewed changes

Copilot reviewed 27 out of 27 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/a2a/TaskRegistryTest.java Unit tests for task state machine, parent/child tracking, timestamps, concurrency.
eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/a2a/InMemoryA2AMessageTransportTest.java Tests for in-memory pub/sub delivery, wildcard matching, unsubscribe.
eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/a2a/AgentCardTestUtils.java Test helper to build schema-valid AgentCard instances.
eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/a2a/A2AGatewayServiceTest.java Service-layer tests: submit, cancel, timeout, SSE subscriber notifications, validation.
eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/a2a/A2AGatewayEndToEndTest.java In-process end-to-end tests across registry + service + agent-card TTL behavior.
eventmesh-runtime/src/test/java/org/apache/eventmesh/runtime/a2a/A2AClientServerIntegrationTest.java Real HTTP client/server integration coverage (REST, CORS, SSE, pagination, heartbeat).
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/TaskRegistry.java In-memory task lifecycle registry with TTL cleanup scheduler.
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/InMemoryA2AMessageTransport.java In-memory implementation of A2AMessageTransport with “+” wildcard support.
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/A2APublishSubscribeService.java AgentCard registry enhanced with TTL/heartbeat cleanup and agent registration check.
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/A2AGatewayService.java Core gateway orchestration: task submission, response handling, status subscribers, timeout.
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/A2AGatewayServer.java Standalone Netty HTTP server wiring gateway components + mock “weather-agent”.
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/A2AGatewayHttpHandler.java REST API handler: tasks endpoints, list/pagination, SSE streaming, heartbeat, CORS.
eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/a2a/A2ACardHttpHandler.java AgentCard HTTP handler updated to support CORS preflight and CORS headers.
eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/test/java/org/apache/eventmesh/protocol/a2a/A2ATopicFactoryTest.java Topic factory generation + parsing tests and validation checks.
eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/A2ATopicFactory.java Standard topic naming + wildcard topics + parsing into structured components.
eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/A2AMessageTransport.java Transport abstraction interface for pub/sub (CloudEvents).
eventmesh-protocol-plugin/eventmesh-protocol-a2a/src/main/java/org/apache/eventmesh/protocol/a2a/A2AClient.java Typed Java SDK: card registration, task ops, heartbeat, SSE streaming, optional transport mode.
eventmesh-protocol-plugin/eventmesh-protocol-a2a/build.gradle Adds Apache HttpClient dependency for the SDK implementation.
eventmesh-examples/src/main/java/org/apache/eventmesh/a2a/demo/README.md Demo documentation: architecture diagram, curl examples, SDK usage, test commands.
eventmesh-examples/src/main/java/org/apache/eventmesh/a2a/demo/gateway/A2AGatewayDemo.java Example client demonstrating SDK usage against the running gateway server.
docs/a2a-protocol/TEST_RESULTS.md Updates test results summary to include gateway runtime coverage.
docs/a2a-protocol/README.md Adds gateway runtime architecture and REST/SSE usage (CN).
docs/a2a-protocol/README_EN.md Adds gateway runtime architecture and REST/SSE usage (EN).
docs/a2a-protocol/IMPLEMENTATION_SUMMARY.md Implementation summary expanded to include gateway runtime and design decisions (CN).
docs/a2a-protocol/IMPLEMENTATION_SUMMARY_EN.md Implementation summary expanded to include gateway runtime and design decisions (EN).
docs/a2a-protocol/eventmesh-a2a-design.md Design doc updated for gateway runtime architecture and SDK/transport components.
docs/a2a-protocol/ARCHITECTURE.md Architecture doc updated to document gateway runtime and REST/SSE endpoints.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Map.Entry<String, TaskEntry> entry = it.next();
TaskEntry te = entry.getValue();
if (isTerminalState(te.getState()) && (now - te.getUpdatedAt()) > taskTtlMs) {
it.remove();
Map.Entry<AgentIdentity, RegisteredCard> entry = it.next();
RegisteredCard rc = entry.getValue();
if (now - rc.lastHeartbeat > cardTtlMs) {
it.remove();
Comment on lines +226 to +233
} else {
// Async mode: return task ID immediately
Map<String, Object> response = new LinkedHashMap<>();
response.put("taskId", taskId);
response.put("status", "accepted");
response.put("message", "Task submitted. Use GET /a2a/tasks/" + taskId + " to check status.");
return jsonResponse(HttpResponseStatus.ACCEPTED, objectMapper.writeValueAsString(response));
}
Comment on lines +322 to +324
String timeoutStr = getQueryParam(decoder, "timeout");
long timeout = timeoutStr != null ? Long.parseLong(timeoutStr) : DEFAULT_WAIT_TIMEOUT_MS;

Comment on lines +259 to +261
}

int total = filteredTasks.size();
}

private FullHttpResponse jsonError(HttpResponseStatus status, String message) {
String body = "{\"error\":\"" + message + "\"}";
Comment on lines +135 to +138
String[] parts = agentName.split("/");
String orgId = parts.length >= 1 ? parts[0] : "default";
String unitId = parts.length >= 2 ? parts[1] : "default";
String agentId = parts.length >= 3 ? parts[2] : agentName;
Comment on lines +173 to +176
String[] parts = agentName.split("/");
String orgId = parts.length >= 1 ? parts[0] : "default";
String unitId = parts.length >= 2 ? parts[1] : "default";
String agentId = parts.length >= 3 ? parts[2] : agentName;
Comment on lines +368 to +371
this.transport = transport;
String requestTopic = A2ATopicFactory.agentRequestTopic(namespace, agentName);
requestSubscriptionId = transport.subscribe(requestTopic, this::handleRequestMessage);
log.info("Subscribed to agent request topic: {}", requestTopic);

@xwm1992 xwm1992 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@xwm1992 xwm1992 merged commit 2527428 into apache:master Jun 29, 2026
13 of 14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants