323 lines
11 KiB
JavaScript
323 lines
11 KiB
JavaScript
import { env } from '../config/env.js';
|
||
import {
|
||
getUserScenarioPermission,
|
||
getScenarioById,
|
||
getScenarioStep,
|
||
getFirstScenarioStep,
|
||
createGeneration,
|
||
updateGeneration,
|
||
getGenerationByUuid,
|
||
} from '../repositories/scenario.repository.js';
|
||
import { pool } from '../db.js';
|
||
|
||
// Экспорт для использования в routes
|
||
export { getScenarioStep };
|
||
|
||
/**
|
||
* Валидация данных по JSON Schema (строгая - все required поля)
|
||
*/
|
||
function validateInputSchema(inputSchema, data) {
|
||
const errors = [];
|
||
|
||
if (!inputSchema || inputSchema === '{}') {
|
||
return { valid: true, errors: [] };
|
||
}
|
||
|
||
const schema = typeof inputSchema === 'string' ? JSON.parse(inputSchema) : inputSchema;
|
||
|
||
// Проверка required полей
|
||
if (schema.required && Array.isArray(schema.required)) {
|
||
for (const field of schema.required) {
|
||
if (data[field] === undefined || data[field] === null) {
|
||
errors.push(`Missing required field: ${field}`);
|
||
}
|
||
}
|
||
}
|
||
|
||
// Проверка типов полей
|
||
if (schema.properties) {
|
||
for (const [field, fieldSchema] of Object.entries(schema.properties)) {
|
||
const value = data[field];
|
||
if (value === undefined || value === null) {
|
||
continue; // already checked in required
|
||
}
|
||
|
||
if (fieldSchema.type === 'string' && typeof value !== 'string') {
|
||
errors.push(`Field '${field}' must be a string`);
|
||
} else if (fieldSchema.type === 'boolean' && typeof value !== 'boolean') {
|
||
errors.push(`Field '${field}' must be a boolean`);
|
||
} else if (fieldSchema.type === 'number' && typeof value !== 'number') {
|
||
errors.push(`Field '${field}' must be a number`);
|
||
} else if (fieldSchema.type === 'integer' && (!Number.isInteger(value))) {
|
||
errors.push(`Field '${field}' must be an integer`);
|
||
} else if (fieldSchema.type === 'array' && !Array.isArray(value)) {
|
||
errors.push(`Field '${field}' must be an array`);
|
||
} else if (fieldSchema.type === 'object' && (typeof value !== 'object' || Array.isArray(value))) {
|
||
errors.push(`Field '${field}' must be an object`);
|
||
}
|
||
|
||
// Проверка minLength для string
|
||
if (fieldSchema.type === 'string' && typeof value === 'string' && fieldSchema.minLength !== undefined) {
|
||
if (value.length < fieldSchema.minLength) {
|
||
errors.push(`Field '${field}' must have at least ${fieldSchema.minLength} characters`);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return {
|
||
valid: errors.length === 0,
|
||
errors,
|
||
};
|
||
}
|
||
|
||
/**
|
||
* Проверка прав доступа на запуск сценария
|
||
*/
|
||
export async function assertUserCanStartScenario({ userId, scenarioId }) {
|
||
const scenario = await getScenarioById(scenarioId);
|
||
if (!scenario) {
|
||
const error = new Error(`Scenario '${scenarioId}' not found`);
|
||
error.code = 'SCENARIO_NOT_FOUND';
|
||
error.status = 404;
|
||
throw error;
|
||
}
|
||
|
||
if (scenario.status !== 'active') {
|
||
const error = new Error(`Scenario '${scenarioId}' is not active (status: ${scenario.status})`);
|
||
error.code = 'SCENARIO_NOT_ACTIVE';
|
||
error.status = 403;
|
||
throw error;
|
||
}
|
||
|
||
// Публичные сценарии доступны всем авторизованным пользователям
|
||
if (!scenario.is_public) {
|
||
const permission = await getUserScenarioPermission(userId, scenarioId);
|
||
if (!permission || !permission.can_start) {
|
||
const error = new Error(`User '${userId}' does not have permission to start scenario '${scenarioId}'`);
|
||
error.code = 'SCENARIO_ACCESS_DENIED';
|
||
error.status = 403;
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* Проверка прав доступа на выполнение шага
|
||
*/
|
||
export async function assertUserCanExecuteStep({ userId, scenarioId, stepId }) {
|
||
const scenario = await getScenarioById(scenarioId);
|
||
if (!scenario) {
|
||
const error = new Error(`Scenario '${scenarioId}' not found`);
|
||
error.code = 'SCENARIO_NOT_FOUND';
|
||
error.status = 404;
|
||
throw error;
|
||
}
|
||
|
||
const step = await getScenarioStep(scenarioId, stepId);
|
||
if (!step) {
|
||
const error = new Error(`Step '${stepId}' not found in scenario '${scenarioId}'`);
|
||
error.code = 'STEP_NOT_FOUND';
|
||
error.status = 404;
|
||
throw error;
|
||
}
|
||
|
||
if (step.status !== 'active') {
|
||
const error = new Error(`Step '${stepId}' is not active (status: ${step.status})`);
|
||
error.code = 'STEP_NOT_ACTIVE';
|
||
error.status = 403;
|
||
throw error;
|
||
}
|
||
|
||
// Публичные сценарии доступны всем авторизованным пользователям
|
||
if (!scenario.is_public) {
|
||
const permission = await getUserScenarioPermission(userId, scenarioId);
|
||
if (!permission || !permission.can_execute) {
|
||
const error = new Error(`User '${userId}' does not have permission to execute steps in scenario '${scenarioId}'`);
|
||
error.code = 'SCENARIO_ACCESS_DENIED';
|
||
error.status = 403;
|
||
throw error;
|
||
}
|
||
}
|
||
|
||
return true;
|
||
}
|
||
|
||
/**
|
||
* Запуск сценария
|
||
*/
|
||
export async function startScenario({ userId, scenarioId, input, user }) {
|
||
// Валидация input для start - body должен быть пустым или минимальным
|
||
// Для start сценария проверяем, что нет лишних данных (body должен быть пустым {})
|
||
if (input && Object.keys(input).length > 0) {
|
||
const error = new Error('Start scenario body must be empty');
|
||
error.code = 'INVALID_START_BODY';
|
||
error.status = 400;
|
||
throw error;
|
||
}
|
||
|
||
const firstStep = await getFirstScenarioStep(scenarioId);
|
||
if (!firstStep) {
|
||
const error = new Error(`No active steps found in scenario '${scenarioId}'`);
|
||
error.code = 'NO_ACTIVE_STEPS';
|
||
error.status = 400;
|
||
throw error;
|
||
}
|
||
|
||
// Создаем generation
|
||
const generation = await createGeneration({
|
||
userId,
|
||
scenarioId,
|
||
authSessionId: user?.sessionId || null,
|
||
requestPayload: {},
|
||
currentStepId: firstStep.step_id,
|
||
});
|
||
|
||
// Формируем payload для n8n
|
||
// generationUuid НЕ отправляем - n8n должен сгенерировать свой и вернуть в ответе
|
||
const n8nPayload = {
|
||
meta: {
|
||
userId: user.id,
|
||
userEmail: user.email,
|
||
sessionId: user.sessionId,
|
||
scenarioId,
|
||
},
|
||
body: {},
|
||
};
|
||
|
||
// Вызов n8n webhook
|
||
const n8nUrl = `${env.N8N_BASE_URL}/webhook/scenario/${scenarioId}/start`;
|
||
const n8nResponse = await callN8nWebhook(n8nUrl, n8nPayload);
|
||
|
||
// Обновляем generation с external_run_id и generation_uuid от n8n
|
||
const updateData = {};
|
||
if (n8nResponse?.runId) {
|
||
updateData.externalRunId = n8nResponse.runId;
|
||
}
|
||
if (n8nResponse?.generationUuid) {
|
||
// Если n8n вернул свой generationUuid, используем его
|
||
// Но это маловероятно - скорее всего мы уже сгенерировали свой
|
||
updateData.generationUuid = n8nResponse.generationUuid;
|
||
}
|
||
|
||
if (Object.keys(updateData).length > 0) {
|
||
await updateGeneration(generation.generation_uuid, updateData);
|
||
}
|
||
|
||
return {
|
||
generationUuid: generation.generation_uuid,
|
||
status: generation.status,
|
||
currentStepId: firstStep.step_id,
|
||
};
|
||
}
|
||
|
||
/**
|
||
* Выполнение шага сценария
|
||
* BFF только валидирует входные данные и проксирует запрос в n8n.
|
||
* n8n отвечает за запись в БД (generation_steps, generations).
|
||
*/
|
||
export async function executeStep({ userId, scenarioId, stepId, input, user }) {
|
||
const step = await getScenarioStep(scenarioId, stepId);
|
||
|
||
// Валидация input по input_schema из БД
|
||
const validation = validateInputSchema(step.input_schema, input);
|
||
if (!validation.valid) {
|
||
const error = new Error(`Invalid input: ${validation.errors.join(', ')}`);
|
||
error.code = 'INVALID_INPUT';
|
||
error.status = 400;
|
||
error.details = validation.errors;
|
||
throw error;
|
||
}
|
||
|
||
// Если фронт передал явный generationUuid — берём его (защита от race condition
|
||
// при параллельных submitStep-вызовах от одного пользователя в одном сценарии).
|
||
// Иначе fallback на самую свежую активную generation (back-compat).
|
||
const explicitUuid = input && typeof input.generationUuid === 'string' ? input.generationUuid : null;
|
||
let generation;
|
||
if (explicitUuid) {
|
||
const r = await pool.query(
|
||
`SELECT generation_uuid, current_step_id, status FROM uno_bff.generations WHERE user_id = $1 AND scenario_id = $2 AND generation_uuid = $3::uuid LIMIT 1`,
|
||
[userId, scenarioId, explicitUuid],
|
||
);
|
||
generation = r.rows[0];
|
||
if (input && 'generationUuid' in input) { delete input.generationUuid; }
|
||
} else {
|
||
const r = await pool.query(
|
||
`SELECT generation_uuid, current_step_id, status FROM uno_bff.generations WHERE user_id = $1 AND scenario_id = $2 AND status IN ('running', 'waiting_for_input') ORDER BY created_at DESC LIMIT 1`,
|
||
[userId, scenarioId],
|
||
);
|
||
generation = r.rows[0];
|
||
}
|
||
|
||
if (!generation) {
|
||
const error = new Error(`No active generation found for scenario '${scenarioId}'`);
|
||
error.code = 'GENERATION_NOT_FOUND';
|
||
error.status = 400;
|
||
throw error;
|
||
}
|
||
|
||
// Формируем payload для n8n
|
||
const n8nPayload = {
|
||
meta: {
|
||
userId: user.id,
|
||
userEmail: user.email,
|
||
scenarioId,
|
||
stepId,
|
||
generationUuid: generation.generation_uuid,
|
||
},
|
||
body: input,
|
||
};
|
||
|
||
// Вызов n8n webhook - n8n сам пишет в БД
|
||
const n8nUrl = `${env.N8N_BASE_URL}/webhook/scenario/${scenarioId}/step/${stepId}`;
|
||
const n8nResponse = await callN8nWebhook(n8nUrl, n8nPayload);
|
||
|
||
// Возвращаем ответ от n8n
|
||
return {
|
||
generationUuid: generation.generation_uuid,
|
||
stepState: n8nResponse?.stepState || 'processing',
|
||
nextStepId: n8nResponse?.nextStepId || stepId,
|
||
n8nResponse: n8nResponse?.data || {},
|
||
};
|
||
}
|
||
|
||
/**
|
||
* Вызов n8n webhook
|
||
*/
|
||
async function callN8nWebhook(url, payload) {
|
||
try {
|
||
const response = await fetch(url, {
|
||
method: 'POST',
|
||
headers: {
|
||
'Content-Type': 'application/json',
|
||
},
|
||
body: JSON.stringify(payload),
|
||
});
|
||
|
||
if (!response.ok) {
|
||
const errorText = await response.text();
|
||
const error = new Error(`n8n webhook failed: ${response.status} ${response.statusText}`);
|
||
error.code = 'N8N_WEBHOOK_ERROR';
|
||
error.status = response.status;
|
||
error.n8nResponse = errorText;
|
||
throw error;
|
||
}
|
||
|
||
const data = await response.json();
|
||
return {
|
||
runId: data.runId || data.externalRunId || null,
|
||
data,
|
||
};
|
||
} catch (err) {
|
||
if (err.code === 'N8N_WEBHOOK_ERROR') {
|
||
throw err;
|
||
}
|
||
const error = new Error(`Failed to call n8n webhook: ${err.message}`);
|
||
error.code = 'N8N_CONNECTION_ERROR';
|
||
error.status = 503;
|
||
throw error;
|
||
}
|
||
}
|