import ScenidCloudService from './ScenidCloudService.class.js'
const _sourceCache = new Map() // serviceUrl → { byName, byId, fetchedAt, promise }
const CACHE_TTL = 5 * 60 * 1000
const CACHE_FRESH_GUARD = 10 * 1000
const isCacheStale = entry => !entry || (Date.now() - entry.fetchedAt) >= CACHE_TTL
const isCacheFresh = entry => entry && (Date.now() - entry.fetchedAt) < CACHE_FRESH_GUARD
/**
* Client for the Scenid Insights Service.
*
* Provides access to data sources and their documents via a factory pattern:
* call `Source(nameOrId)` to get a scoped handle for a specific source, then
* use the returned object's methods to query or mutate documents.
*
* Source names (e.g. `'Orders'`) are resolved to their sourceId automatically
* using a per-service-URL cache backed by `GET /api/v2/source`. The cache is
* populated on first use and refreshed after 5 minutes or on a cache miss.
*
* Obtained via `sdk.asAdmin().Insights()` or `sdk.asUser(idToken).Insights()`.
*
* @extends ScenidCloudService
*
* @example
* const insights = sdk.asAdmin().Insights()
*
* // Read source metadata by name
* const { result } = await insights.Source('Orders').get()
*
* // Create a document
* await insights.Source('Orders').Doc.create({ temperature: 21.5, unit: 'C' })
*/
class InsightsService extends ScenidCloudService {
/**
* @param {string|function(): Promise.<string>} tokenSource
* @param {string} serviceUrl
*/
constructor (tokenSource, serviceUrl) {
super(tokenSource, serviceUrl)
/**
* Returns a scoped handle for the given source, identified by name or sourceId.
*
* The name is resolved to a sourceId via a local cache on first call. If the
* name is not found in the cache, the cache is refreshed once before falling
* back to using the value as-is.
*
* @param {string} nameOrId - The source's human-readable name (e.g. `'Orders'`) or raw sourceId.
* @returns {SourceHandle}
*
* @example
* const src = insights.Source('Orders')
* const { result } = await src.get(true) // include formulator schema
*/
this.Source = nameOrId => ({
/**
* Fetches metadata for the source, optionally including its formulator validation schema.
*
* @param {boolean} [withSchema=false] - When `true`, the response includes a `schema`
* object (`{ validation, render, translations }`) that can be passed directly to
* `FormulatorForm`.
* @returns {Promise<ServiceResponse>}
*
* @example
* const { result } = await insights.Source('Orders').get(true)
* // result.schema.validation, result.schema.render, result.schema.translations
*/
get: async (withSchema = false) => {
const sourceId = await this._resolveSource(nameOrId)
const query = withSchema ? '?withSchema=true' : ''
return this.get(`/api/v2/source/${sourceId}${query}`)
},
/**
* Runs an aggregation query against the source's data.
*
* Supports filtering, sorting, field selection, pagination via cursor (`pageKey`),
* and Elasticsearch-style aggregations. Pass `raw: true` to query the raw index
* (unprocessed field values as ingested).
*
* @param {Object} params - Aggregation parameters.
* @param {number} [params.size] - Number of rows to return.
* @param {string[]} [params.fields] - Fields to include in each row.
* @param {Object} [params.sort] - Sort descriptor.
* @param {Object} [params.filter] - Filter conditions.
* @param {Object} [params.aggs] - Elasticsearch aggregation definitions.
* @param {string} [params.pageKey] - Cursor for the next page (from a previous response's `nextPageKey`).
* @param {boolean} [params.raw] - When `true`, queries the raw index.
* @returns {Promise<ServiceResponse>}
*
* @example
* const { result } = await insights.Source('Orders').aggregate({
* size: 100,
* filter: { status: 'active' },
* sort: { createdAt: 'desc' }
* })
*/
aggregate: async params => {
const sourceId = await this._resolveSource(nameOrId)
return this.post(`/api/v2/source/${sourceId}/aggregate`, params)
},
push: async docs => {
const sourceId = await this._resolveSource(nameOrId)
return this.post(`/api/v2/source/${sourceId}/push`, docs)
},
/**
* Document-level operations scoped to this source.
* @namespace
*/
Doc: {
/**
* Fetches a single document by its ID from the main (transformed) index.
*
* @param {string} docId - The document's identifier.
* @returns {Promise<ServiceResponse>}
*/
get: async docId => {
const sourceId = await this._resolveSource(nameOrId)
return this.get(`/api/v2/source/${sourceId}/doc/${docId}`)
},
/**
* Creates a new document in the source.
*
* The document is run through the source's field type transformations
* and stored in both the raw index (original values) and the main index
* (transformed values). The document ID is derived from a hash of the
* raw field values, making creates idempotent for identical payloads.
*
* @param {Object} body - Flat document object (no nested keys, no `_id` or `id` fields).
* @param {Object} [options]
* @param {boolean} [options.return] - When `true`, returns `{ id, doc }` instead of `{ id, result: 'created' }`.
* @returns {Promise<ServiceResponse>}
*
* @example
* const { result } = await insights.Source('Orders').Doc.create(
* { temperature: 21.5, location: 'Berlin' },
* { return: true }
* )
* // result.id, result.doc
*/
create: async (body, options = {}) => {
const sourceId = await this._resolveSource(nameOrId)
return this.post(`/api/v2/source/${sourceId}/doc`, { data: body, ...options })
},
/**
* Updates an existing document by its ID.
*
* By default this is a full replacement: the incoming `body` replaces all
* stored fields. Pass `update: true` for a merge update — the service fetches
* the current raw document, shallow-merges your `body` on top, then re-indexes.
*
* @param {string} docId - The document's identifier.
* @param {Object} body - New field values. Must be flat (no nesting, no reserved fields).
* @param {Object} [options]
* @param {boolean} [options.update] - When `true`, merges `body` into the existing document rather than replacing it.
* @param {boolean} [options.return] - When `true`, returns the resulting document after indexing.
* @returns {Promise<ServiceResponse>}
*
* @example
* // Full replacement
* await insights.Source('Orders').Doc.update('doc-abc', { temperature: 22.0, location: 'Berlin' })
*
* // Partial update — only changes temperature, keeps other fields
* await insights.Source('Orders').Doc.update('doc-abc', { temperature: 22.0 }, { update: true, return: true })
*/
update: async (docId, body, options = {}) => {
const sourceId = await this._resolveSource(nameOrId)
return this.put(`/api/v2/source/${sourceId}/doc/${docId}`, { data: body, ...options })
},
/**
* Deletes a document from both the raw and main indices.
*
* @param {string} docId - The document's identifier.
* @param {Object} [options]
* @param {boolean} [options.return] - When `true`, returns `{ id, doc }` with the deleted document instead of 204.
* @returns {Promise<ServiceResponse>}
*
* @example
* const { result } = await insights.Source('Orders').Doc.delete('doc-abc', { return: true })
* // result.id, result.doc
*/
delete: async (docId, options = {}) => {
const sourceId = await this._resolveSource(nameOrId)
const query = options.return ? '?return=true' : ''
return this.delete(`/api/v2/source/${sourceId}/doc/${docId}${query}`)
},
/**
* Inserts multiple documents in a single bulk request.
*
* Each document is independently transformed and hashed. The response includes
* counts of created and updated documents and a per-document error list for any
* that failed ES indexing. Pass `return: true` to include the computed document
* IDs in the response.
*
* The batch endpoint also accepts `Content-Type: application/x-ndjson` for
* streaming ingestion from tools like Vector — in that case send raw JSONL lines
* directly without the `{ data }` envelope.
*
* @param {Object[]} docs - Array of flat document objects.
* @param {Object} [options]
* @param {boolean} [options.return] - When `true`, includes `ids` (array of doc IDs) in the response.
* @returns {Promise<ServiceResponse>}
* @throws {Error} `scenid/empty-batch` – `docs` is empty or not an array.
*
* @example
* const { result } = await insights.Source('Orders').Doc.batchInsert([
* { temperature: 21.5, location: 'Berlin' },
* { temperature: 19.0, location: 'Hamburg' }
* ], { return: true })
* // result.created, result.updated, result.failed, result.ids
*/
batchInsert: (docs, options = {}) => {
if (!Array.isArray(docs) || !docs.length) throw new Error('scenid/empty-batch')
return (async () => {
const sourceId = await this._resolveSource(nameOrId)
return this.post(`/api/v2/source/${sourceId}/batch`, { data: docs, ...options })
})()
}
}
})
/**
* Returns a handle for fetching documents across multiple sources at once.
*
* Source names (e.g. `['Orders', 'Inventory']`) are resolved to sourceIds via
* the local cache. Results are grouped by source name in the response.
*
* @param {string[]} namesOrIds - List of source names or sourceIds.
* @returns {Object}
*
* @example
* const { result } = await insights.Sources(['Orders', 'Inventory'])
* .Docs.find({ humanId: 'HT-4d44d' })
* // result.total, result.Orders, result.Inventory
*/
this.Sources = namesOrIds => ({
Docs: {
/**
* Fetches documents from all listed sources matching the given filter.
*
* Each key-value pair in `filterObj` is treated as an equality condition.
* Results are grouped by source name.
*
* @param {Object} [filterObj={}] - `{ field: value }` pairs, all eq-matched.
* @param {Object} [options]
* @param {number} [options.size=10] - Max docs to return (server caps at 1000).
* @returns {Promise<ServiceResponse>} `{ total, [sourceName]: docs[] }`
*/
find: async (filterObj = {}, { size = 10 } = {}) => {
const resolved = await Promise.all(namesOrIds.map(n => this._resolveSource(n)))
const filter = Object.entries(filterObj).map(([field, value]) => ({ field, operator: 'eq', value }))
const params = new URLSearchParams({ size: String(size) })
if (filter.length) params.set('filter', JSON.stringify(filter))
return this.get(`/api/v2/sources/${resolved.join(',')}/docs?${params}`)
}
}
})
}
async _refreshSourceCache () {
const existing = _sourceCache.get(this.serviceUrl)
if (existing?.promise) {
await existing.promise
return
}
let resolve
const promise = new Promise(r => { resolve = r })
_sourceCache.set(this.serviceUrl, { ...(existing ?? {}), promise })
try {
const { result } = await this.get('/api/v2/source')
const sources = Array.isArray(result) ? result : []
const byName = new Map()
const byId = new Map()
sources.forEach(s => {
byName.set(s.name, s.sourceId)
byId.set(s.sourceId, s.sourceId)
})
_sourceCache.set(this.serviceUrl, { byName, byId, fetchedAt: Date.now(), promise: undefined })
resolve()
} catch (e) {
_sourceCache.delete(this.serviceUrl)
resolve()
throw e
}
}
async _resolveSource (key) {
const cache = _sourceCache.get(this.serviceUrl)
if (!isCacheStale(cache)) {
if (cache.byName.has(key)) return cache.byName.get(key)
if (cache.byId.has(key)) return key
if (!isCacheFresh(cache)) {
await this._refreshSourceCache()
const fresh = _sourceCache.get(this.serviceUrl)
if (fresh?.byName.has(key)) return fresh.byName.get(key)
}
throw new Error('insights/source-not-found')
}
await this._refreshSourceCache()
const fresh = _sourceCache.get(this.serviceUrl)
if (fresh?.byName.has(key)) return fresh.byName.get(key)
throw new Error('insights/source-not-found')
}
}
export default InsightsService
Source