Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ protected PortObject[] execute(final PortObject[] inObjects, final ExecutionCont

/**
* Throw a nicer error message if the exception we are seeing is an "Error while obtaining a new communication
* channel"
* channel" or "Cannot obtain a new communication channel" (CallbackClient already shut down)
*
* @param ex The exception
* @throws KNIMEException A more human-readable exception
Expand All @@ -245,6 +245,17 @@ private void handleNewCommunicationChannelError(final Py4JException ex) throws K
"The Python process we prepared in the background got killed. Try again to start a new one.");
throw KNIMEException.of(messageBuilder.build().orElseThrow(), ex);
}
// Handle the case where the CallbackClient was already shut down (no cause)
if (ex.getCause() == null && ex.getMessage() != null
&& ex.getMessage().startsWith("Cannot obtain a new communication channel")) {
var messageBuilder = createMessageBuilder();
messageBuilder.withSummary("The Python communication channel was unexpectedly shut down before use. "
+ "Details: " + ex.getMessage());
messageBuilder.addResolutions("This is a known intermittent issue. Please try executing again.",
"Check the KNIME log for entries around this error for more information about "
+ "what shut down the communication channel.");
throw KNIMEException.of(messageBuilder.build().orElseThrow(), ex);
}
}

private void runUserScript(final PythonScriptingSession session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ final class PythonScriptingSession implements AsynchronousCloseable<IOException>
m_fileStoreHandlerSupplier = fileStoreHandlerSupplier;
m_gateway = createGateway(pythonCommand);
m_entryPoint = m_gateway.getEntryPoint();
LOGGER.infoWithFormat("PythonScriptingSession created gateway (hash=%s) on thread '%s'",
Integer.toHexString(System.identityHashCode(m_gateway)), Thread.currentThread().getName());
m_tableConverter = new PythonArrowTableConverter(EXECUTOR_SERVICE, ARROW_STORE_FACTORY,
fileStoreHandlerSupplier.getWriteFileStoreHandler());
m_outputRedirector = PythonGatewayUtils.redirectGatewayOutput(m_gateway, LOGGER::info, LOGGER::info);
Expand Down Expand Up @@ -495,6 +497,13 @@ private void setCurrentWorkingDirectory() {
Optional.ofNullable(workflowDirRef).map(r -> r.getFile().toString())
.ifPresent(m_entryPoint::setCurrentWorkingDirectory);
} catch (Py4JException ex) {
// Log detailed diagnostics if the CallbackClient was already shut down
if (ex.getCause() == null && ex.getMessage() != null
&& ex.getMessage().startsWith("Cannot obtain a new communication channel")) {
LOGGER.error("CallbackClient was shut down before first use. " + "Gateway hash="
+ Integer.toHexString(System.identityHashCode(m_gateway)) + ", thread='"
+ Thread.currentThread().getName() + "'", ex);
}
PythonProcessTerminatedException.throwIfTerminated(m_gateway, ex);
throw ex;
} catch (Exception ex) { // NOSONAR: We want to catch any exception here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ public T getEntryPoint() {
@Override
public void close() throws IOException {
if (m_clientServer != null) {
LOGGER.infoWithFormat("Closing PythonGateway (PID=%s) from thread '%s'", m_pid,
Thread.currentThread().getName());
m_entryPoint = null;
m_clientServer.shutdown();
m_clientServer = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,18 +182,21 @@ public void notify(final EventObject o) {
if (o instanceof PhaseEvent && ((PhaseEvent)o).getPhaseId().equals(PhaseSetFactory.PHASE_INSTALL)
&& ((PhaseEvent)o).getType() == PhaseEvent.TYPE_START) {
// lock if we enter the "install" phase
LOGGER.info("Blocking Python process startup during installation");
LOGGER.info("Blocking Python process startup during installation (thread='"
+ Thread.currentThread().getName() + "')");
INSTANCE.blockPythonCreation();
} else if (o instanceof PhaseEvent && ((PhaseEvent)o).getPhaseId().equals(PhaseSetFactory.PHASE_CONFIGURE)
&& ((PhaseEvent)o).getType() == PhaseEvent.TYPE_START) {
// "configure" is the normal phase after install, so we can unlock Python processes again
LOGGER.info("Allowing Python process startup again after installation");
LOGGER.info("Allowing Python process startup again after installation (thread='"
+ Thread.currentThread().getName() + "')");
INSTANCE.allowPythonCreation();
} else if (o instanceof RollbackOperationEvent && !INSTANCE.isPythonGatewayCreationAllowed()) {
// According to org.eclipse.equinox.internal.p2.engine.Engine.perform() -> L92,
// a RollbackOperationEvent will be fired if an operation failed, and this event is only fired in that case,
// so we unlock if we are currently locked.
LOGGER.info("Allowing Python process startup again after installation failed");
LOGGER.info("Allowing Python process startup again after installation failed (thread='"
+ Thread.currentThread().getName() + "')");
INSTANCE.allowPythonCreation();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,11 @@ void clear() throws IOException {
return;
}

LOGGER.error("Found running Python processes. Aborting them to allow installation process. "
+ "If this leads to failures in node execution, "
+ "please restart those nodes once the installation has finished");
LOGGER.errorWithFormat(
"Found running Python processes (%d). Aborting them to allow installation process. "
+ "If this leads to failures in node execution, "
+ "please restart those nodes once the installation has finished. Triggered from thread '%s'.",
m_openGateways.size(), Thread.currentThread().getName());

var exceptions = new ArrayList<Exception>();
for (var gateway : m_openGateways) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,13 @@ public void onPythonGatewayCreationGateOpen() {

@Override
public void onPythonGatewayCreationGateClose() {
evictGateways(//
m_gateways.values().stream()//
.flatMap(Collection::stream)//
.collect(Collectors.toList())//
);
var gatewaysToEvict = m_gateways.values().stream()//
.flatMap(Collection::stream)//
.collect(Collectors.toList());
LOGGER.warnWithFormat(
"PythonGatewayCreationGate closed: evicting %d queued gateways from thread '%s'",
gatewaysToEvict.size(), Thread.currentThread().getName());
evictGateways(gatewaysToEvict);
}
});
}
Expand Down
Loading