Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.modelcontextprotocol.client.LifecycleInitializer.Initialization;
import io.modelcontextprotocol.json.TypeRef;
import io.modelcontextprotocol.json.schema.JsonSchemaValidator;
Expand All @@ -30,16 +33,14 @@
import io.modelcontextprotocol.spec.McpSchema.ElicitResult;
import io.modelcontextprotocol.spec.McpSchema.GetPromptRequest;
import io.modelcontextprotocol.spec.McpSchema.GetPromptResult;
import io.modelcontextprotocol.util.ToolNameValidator;
import io.modelcontextprotocol.spec.McpSchema.ListPromptsResult;
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
import io.modelcontextprotocol.spec.McpSchema.PaginatedRequest;
import io.modelcontextprotocol.spec.McpSchema.Root;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.ToolNameValidator;
import io.modelcontextprotocol.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -107,6 +108,9 @@ public class McpAsyncClient {
public static final TypeRef<McpSchema.ProgressNotification> PROGRESS_NOTIFICATION_TYPE_REF = new TypeRef<>() {
};

public static final TypeRef<McpSchema.ElicitationCompleteNotification> ELICITATION_COMPLETE_NOTIFICATION_TYPE_REF = new TypeRef<>() {
};

public static final String NEGOTIATED_PROTOCOL_VERSION = "io.modelcontextprotocol.client.negotiated-protocol-version";

/**
Expand Down Expand Up @@ -297,6 +301,16 @@ public class McpAsyncClient {
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_PROGRESS,
asyncProgressNotificationHandler(progressConsumersFinal));

// Elicitation Complete Notification
List<Function<McpSchema.ElicitationCompleteNotification, Mono<Void>>> elicitationCompleteConsumersFinal = new ArrayList<>();
elicitationCompleteConsumersFinal
.add((notification) -> Mono.fromRunnable(() -> logger.debug("Elicitation complete: {}", notification)));
if (!Utils.isEmpty(features.elicitationCompleteConsumers())) {
elicitationCompleteConsumersFinal.addAll(features.elicitationCompleteConsumers());
}
notificationHandlers.put(McpSchema.METHOD_NOTIFICATION_ELICITATION_COMPLETE,
asyncElicitationCompleteNotificationHandler(elicitationCompleteConsumersFinal));

Function<Initialization, Mono<Void>> postInitializationHook = init -> {

if (init.initializeResult().capabilities().tools() == null || !enableCallToolSchemaCaching) {
Expand Down Expand Up @@ -1037,6 +1051,18 @@ private NotificationHandler asyncProgressNotificationHandler(
};
}

private NotificationHandler asyncElicitationCompleteNotificationHandler(
List<Function<McpSchema.ElicitationCompleteNotification, Mono<Void>>> elicitationCompleteConsumers) {
return params -> {
McpSchema.ElicitationCompleteNotification notification = transport.unmarshalFrom(params,
ELICITATION_COMPLETE_NOTIFICATION_TYPE_REF);

return Flux.fromIterable(elicitationCompleteConsumers)
.flatMap(consumer -> consumer.apply(notification))
.then();
};
}

/**
* This method is package-private and used for test only. Should not be called by user
* code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@

package io.modelcontextprotocol.client;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.json.McpJsonDefaults;
import io.modelcontextprotocol.json.schema.JsonSchemaValidator;
Expand All @@ -20,15 +29,6 @@
import io.modelcontextprotocol.util.Assert;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Factory class for creating Model Context Protocol (MCP) clients. MCP is a protocol that
* enables AI models to interact with external tools and resources through a standardized
Expand Down Expand Up @@ -189,6 +189,8 @@ class SyncSpec {

private Function<ElicitRequest, ElicitResult> elicitationHandler;

private final List<Consumer<McpSchema.ElicitationCompleteNotification>> elicitationCompleteConsumers = new ArrayList<>();

private Supplier<McpTransportContext> contextProvider = () -> McpTransportContext.EMPTY;

private JsonSchemaValidator jsonSchemaValidator;
Expand Down Expand Up @@ -318,6 +320,21 @@ public SyncSpec elicitation(Function<ElicitRequest, ElicitResult> elicitationHan
return this;
}

/**
* Adds a consumer to be notified when an elicitation complete notification is
* received from the server. This allows the client to react when an out-of-band
* URL elicitation interaction has completed.
* @param consumer A consumer that receives the elicitation complete notification.
* Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if consumer is null
*/
public SyncSpec elicitationCompleteConsumer(Consumer<McpSchema.ElicitationCompleteNotification> consumer) {
Assert.notNull(consumer, "Elicitation complete consumer must not be null");
this.elicitationCompleteConsumers.add(consumer);
return this;
}

/**
* Adds a consumer to be notified when the available tools change. This allows the
* client to react to changes in the server's tool capabilities, such as tools
Expand Down Expand Up @@ -488,7 +505,7 @@ public McpSyncClient build() {
McpClientFeatures.Sync syncFeatures = new McpClientFeatures.Sync(this.clientInfo, this.capabilities,
this.roots, this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers, this.samplingHandler,
this.elicitationHandler, this.enableCallToolSchemaCaching);
this.elicitationHandler, this.enableCallToolSchemaCaching, this.elicitationCompleteConsumers);

McpClientFeatures.Async asyncFeatures = McpClientFeatures.Async.fromSync(syncFeatures);

Expand Down Expand Up @@ -545,6 +562,8 @@ class AsyncSpec {

private Function<ElicitRequest, Mono<ElicitResult>> elicitationHandler;

private final List<Function<McpSchema.ElicitationCompleteNotification, Mono<Void>>> elicitationCompleteConsumers = new ArrayList<>();

private JsonSchemaValidator jsonSchemaValidator;

private boolean enableCallToolSchemaCaching = false; // Default to false
Expand Down Expand Up @@ -672,6 +691,22 @@ public AsyncSpec elicitation(Function<ElicitRequest, Mono<ElicitResult>> elicita
return this;
}

/**
* Adds a consumer to be notified when an elicitation complete notification is
* received from the server. This allows the client to react when an out-of-band
* URL elicitation interaction has completed.
* @param consumer A function that receives the elicitation complete notification
* and returns a Mono signaling completion. Must not be null.
* @return This builder instance for method chaining
* @throws IllegalArgumentException if consumer is null
*/
public AsyncSpec elicitationCompleteConsumer(
Function<McpSchema.ElicitationCompleteNotification, Mono<Void>> consumer) {
Assert.notNull(consumer, "Elicitation complete consumer must not be null");
this.elicitationCompleteConsumers.add(consumer);
return this;
}

/**
* Adds a consumer to be notified when the available tools change. This allows the
* client to react to changes in the server's tool capabilities, such as tools
Expand Down Expand Up @@ -833,7 +868,8 @@ public McpAsyncClient build() {
new McpClientFeatures.Async(this.clientInfo, this.capabilities, this.roots,
this.toolsChangeConsumers, this.resourcesChangeConsumers, this.resourcesUpdateConsumers,
this.promptsChangeConsumers, this.loggingConsumers, this.progressConsumers,
this.samplingHandler, this.elicitationHandler, this.enableCallToolSchemaCaching));
this.samplingHandler, this.elicitationHandler, this.enableCallToolSchemaCaching,
this.elicitationCompleteConsumers));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ class McpClientFeatures {
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
* @param enableCallToolSchemaCaching whether to enable call tool schema caching.
* @param elicitationCompleteConsumers the elicitation complete notification
* consumers.
*/
record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map<String, McpSchema.Root> roots, List<Function<List<McpSchema.Tool>, Mono<Void>>> toolsChangeConsumers,
Expand All @@ -73,7 +75,8 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers,
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler,
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler,
boolean enableCallToolSchemaCaching) {
boolean enableCallToolSchemaCaching,
List<Function<McpSchema.ElicitationCompleteNotification, Mono<Void>>> elicitationCompleteConsumers) {

/**
* Create an instance and validate the arguments.
Expand All @@ -87,6 +90,8 @@ record Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
* @param enableCallToolSchemaCaching whether to enable call tool schema caching.
* @param elicitationCompleteConsumers the elicitation complete notification
* consumers.
*/
public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map<String, McpSchema.Root> roots,
Expand All @@ -98,7 +103,8 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
List<Function<McpSchema.ProgressNotification, Mono<Void>>> progressConsumers,
Function<McpSchema.CreateMessageRequest, Mono<McpSchema.CreateMessageResult>> samplingHandler,
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler,
boolean enableCallToolSchemaCaching) {
boolean enableCallToolSchemaCaching,
List<Function<McpSchema.ElicitationCompleteNotification, Mono<Void>>> elicitationCompleteConsumers) {

Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
Expand All @@ -119,6 +125,8 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
this.samplingHandler = samplingHandler;
this.elicitationHandler = elicitationHandler;
this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
this.elicitationCompleteConsumers = elicitationCompleteConsumers != null ? elicitationCompleteConsumers
: List.of();
}

/**
Expand All @@ -135,7 +143,7 @@ public Async(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities c
Function<McpSchema.ElicitRequest, Mono<McpSchema.ElicitResult>> elicitationHandler) {
this(clientInfo, clientCapabilities, roots, toolsChangeConsumers, resourcesChangeConsumers,
resourcesUpdateConsumers, promptsChangeConsumers, loggingConsumers, List.of(), samplingHandler,
elicitationHandler, false);
elicitationHandler, false, List.of());
}

/**
Expand Down Expand Up @@ -191,10 +199,17 @@ public static Async fromSync(Sync syncSpec) {
.fromCallable(() -> syncSpec.elicitationHandler().apply(r))
.subscribeOn(Schedulers.boundedElastic());

List<Function<McpSchema.ElicitationCompleteNotification, Mono<Void>>> elicitationCompleteConsumers = new ArrayList<>();
for (Consumer<McpSchema.ElicitationCompleteNotification> consumer : syncSpec
.elicitationCompleteConsumers()) {
elicitationCompleteConsumers.add(n -> Mono.<Void>fromRunnable(() -> consumer.accept(n))
.subscribeOn(Schedulers.boundedElastic()));
}

return new Async(syncSpec.clientInfo(), syncSpec.clientCapabilities(), syncSpec.roots(),
toolsChangeConsumers, resourcesChangeConsumers, resourcesUpdateConsumers, promptsChangeConsumers,
loggingConsumers, progressConsumers, samplingHandler, elicitationHandler,
syncSpec.enableCallToolSchemaCaching);
syncSpec.enableCallToolSchemaCaching(), elicitationCompleteConsumers);
}
}

Expand All @@ -213,6 +228,8 @@ public static Async fromSync(Sync syncSpec) {
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
* @param enableCallToolSchemaCaching whether to enable call tool schema caching.
* @param elicitationCompleteConsumers the elicitation complete notification
* consumers.
*/
public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map<String, McpSchema.Root> roots, List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers,
Expand All @@ -223,22 +240,11 @@ public record Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabili
List<Consumer<McpSchema.ProgressNotification>> progressConsumers,
Function<McpSchema.CreateMessageRequest, McpSchema.CreateMessageResult> samplingHandler,
Function<McpSchema.ElicitRequest, McpSchema.ElicitResult> elicitationHandler,
boolean enableCallToolSchemaCaching) {
boolean enableCallToolSchemaCaching,
List<Consumer<McpSchema.ElicitationCompleteNotification>> elicitationCompleteConsumers) {

/**
* Create an instance and validate the arguments.
* @param clientInfo the client implementation information.
* @param clientCapabilities the client capabilities.
* @param roots the roots.
* @param toolsChangeConsumers the tools change consumers.
* @param resourcesChangeConsumers the resources change consumers.
* @param resourcesUpdateConsumers the resource update consumers.
* @param promptsChangeConsumers the prompts change consumers.
* @param loggingConsumers the logging consumers.
* @param progressConsumers the progress consumers.
* @param samplingHandler the sampling handler.
* @param elicitationHandler the elicitation handler.
* @param enableCallToolSchemaCaching whether to enable call tool schema caching.
*/
public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities clientCapabilities,
Map<String, McpSchema.Root> roots, List<Consumer<List<McpSchema.Tool>>> toolsChangeConsumers,
Expand All @@ -249,7 +255,8 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
List<Consumer<McpSchema.ProgressNotification>> progressConsumers,
Function<McpSchema.CreateMessageRequest, McpSchema.CreateMessageResult> samplingHandler,
Function<McpSchema.ElicitRequest, McpSchema.ElicitResult> elicitationHandler,
boolean enableCallToolSchemaCaching) {
boolean enableCallToolSchemaCaching,
List<Consumer<McpSchema.ElicitationCompleteNotification>> elicitationCompleteConsumers) {

Assert.notNull(clientInfo, "Client info must not be null");
this.clientInfo = clientInfo;
Expand All @@ -270,6 +277,8 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
this.samplingHandler = samplingHandler;
this.elicitationHandler = elicitationHandler;
this.enableCallToolSchemaCaching = enableCallToolSchemaCaching;
this.elicitationCompleteConsumers = elicitationCompleteConsumers != null ? elicitationCompleteConsumers
: List.of();
}

/**
Expand All @@ -285,7 +294,7 @@ public Sync(McpSchema.Implementation clientInfo, McpSchema.ClientCapabilities cl
Function<McpSchema.ElicitRequest, McpSchema.ElicitResult> elicitationHandler) {
this(clientInfo, clientCapabilities, roots, toolsChangeConsumers, resourcesChangeConsumers,
resourcesUpdateConsumers, promptsChangeConsumers, loggingConsumers, List.of(), samplingHandler,
elicitationHandler, false);
elicitationHandler, false, List.of());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,15 @@

package io.modelcontextprotocol.server;

import io.modelcontextprotocol.common.McpTransportContext;
import java.util.ArrayList;
import java.util.Collections;

import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.json.TypeRef;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpLoggableSession;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.LoggingLevel;
import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification;
import io.modelcontextprotocol.spec.McpSession;
import io.modelcontextprotocol.util.Assert;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -152,10 +150,33 @@ public Mono<McpSchema.ElicitResult> createElicitation(McpSchema.ElicitRequest el
if (this.clientCapabilities.elicitation() == null) {
return Mono.error(new IllegalStateException("Client must be configured with elicitation capabilities"));
}
if ("url".equals(elicitRequest.mode()) && this.clientCapabilities.elicitation().url() == null) {
return Mono.error(new IllegalStateException(
"Client must be configured with URL elicitation capabilities to handle URL mode requests"));
}
return this.session.sendRequest(McpSchema.METHOD_ELICITATION_CREATE, elicitRequest,
ELICITATION_RESULT_TYPE_REF);
}

/**
* Sends an elicitation complete notification to the client, indicating that an
* out-of-band URL elicitation interaction has completed.
* @param notification The notification containing the elicitation ID
* @return A Mono that completes when the notification has been sent.
* @see McpSchema.ElicitationCompleteNotification
*/
public Mono<Void> sendElicitationComplete(McpSchema.ElicitationCompleteNotification notification) {
if (this.clientCapabilities == null) {
return Mono
.error(new IllegalStateException("Client must be initialized. Call the initialize method first!"));
}
if (this.clientCapabilities.elicitation() == null || this.clientCapabilities.elicitation().url() == null) {
return Mono.error(new IllegalStateException(
"Client must be configured with URL elicitation capabilities to receive elicitation complete notifications"));
}
return this.session.sendNotification(McpSchema.METHOD_NOTIFICATION_ELICITATION_COMPLETE, notification);
}

/**
* Retrieves the list of all roots provided by the client.
* @return A Mono that emits the list of roots result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ public McpSchema.ElicitResult createElicitation(McpSchema.ElicitRequest elicitRe
return this.exchange.createElicitation(elicitRequest).block();
}

/**
* Sends an elicitation complete notification to the client, indicating that an
* out-of-band URL elicitation interaction has completed.
* @param notification The notification containing the elicitation ID
* @see McpSchema.ElicitationCompleteNotification
*/
public void sendElicitationComplete(McpSchema.ElicitationCompleteNotification notification) {
this.exchange.sendElicitationComplete(notification).block();
}

/**
* Retrieves the list of all roots provided by the client.
* @return The list of roots result.
Expand Down
Loading