Source

services/InsightsService.class.js

import ScenidCloudService from './ScenidCloudService.class.js'

const _sourceCache = new Map() // serviceUrl → { byName, byId, fetchedAt, promise }
const CACHE_TTL = 5 * 60 * 1000
const CACHE_FRESH_GUARD = 10 * 1000

const isCacheStale = entry => !entry || (Date.now() - entry.fetchedAt) >= CACHE_TTL
const isCacheFresh = entry => entry && (Date.now() - entry.fetchedAt) < CACHE_FRESH_GUARD

/**
 * Client for the Scenid Insights Service.
 *
 * Provides access to data sources and their documents via a factory pattern:
 * call `Source(nameOrId)` to get a scoped handle for a specific source, then
 * use the returned object's methods to query or mutate documents.
 *
 * Source names (e.g. `'Orders'`) are resolved to their sourceId automatically
 * using a per-service-URL cache backed by `GET /api/v2/source`. The cache is
 * populated on first use and refreshed after 5 minutes or on a cache miss.
 *
 * Obtained via `sdk.asAdmin().Insights()` or `sdk.asUser(idToken).Insights()`.
 *
 * @extends ScenidCloudService
 *
 * @example
 * const insights = sdk.asAdmin().Insights()
 *
 * // Read source metadata by name
 * const { result } = await insights.Source('Orders').get()
 *
 * // Create a document
 * await insights.Source('Orders').Doc.create({ temperature: 21.5, unit: 'C' })
 */
class InsightsService extends ScenidCloudService {
  /**
   * @param {string|function(): Promise.<string>} tokenSource
   * @param {string} serviceUrl
   */
  constructor (tokenSource, serviceUrl) {
    super(tokenSource, serviceUrl)

    /**
     * Returns a scoped handle for the given source, identified by name or sourceId.
     *
     * The name is resolved to a sourceId via a local cache on first call. If the
     * name is not found in the cache, the cache is refreshed once before falling
     * back to using the value as-is.
     *
     * @param {string} nameOrId - The source's human-readable name (e.g. `'Orders'`) or raw sourceId.
     * @returns {SourceHandle}
     *
     * @example
     * const src = insights.Source('Orders')
     * const { result } = await src.get(true)  // include formulator schema
     */
    this.Source = nameOrId => ({
      /**
       * Fetches metadata for the source, optionally including its formulator validation schema.
       *
       * @param {boolean} [withSchema=false] - When `true`, the response includes a `schema`
       *   object (`{ validation, render, translations }`) that can be passed directly to
       *   `FormulatorForm`.
       * @returns {Promise<ServiceResponse>}
       *
       * @example
       * const { result } = await insights.Source('Orders').get(true)
       * // result.schema.validation, result.schema.render, result.schema.translations
       */
      get: async (withSchema = false) => {
        const sourceId = await this._resolveSource(nameOrId)
        const query = withSchema ? '?withSchema=true' : ''
        return this.get(`/api/v2/source/${sourceId}${query}`)
      },

      /**
       * Runs an aggregation query against the source's data.
       *
       * Supports filtering, sorting, field selection, pagination via cursor (`pageKey`),
       * and Elasticsearch-style aggregations. Pass `raw: true` to query the raw index
       * (unprocessed field values as ingested).
       *
       * @param {Object} params - Aggregation parameters.
       * @param {number} [params.size] - Number of rows to return.
       * @param {string[]} [params.fields] - Fields to include in each row.
       * @param {Object} [params.sort] - Sort descriptor.
       * @param {Object} [params.filter] - Filter conditions.
       * @param {Object} [params.aggs] - Elasticsearch aggregation definitions.
       * @param {string} [params.pageKey] - Cursor for the next page (from a previous response's `nextPageKey`).
       * @param {boolean} [params.raw] - When `true`, queries the raw index.
       * @returns {Promise<ServiceResponse>}
       *
       * @example
       * const { result } = await insights.Source('Orders').aggregate({
       *   size: 100,
       *   filter: { status: 'active' },
       *   sort: { createdAt: 'desc' }
       * })
       */
      aggregate: async params => {
        const sourceId = await this._resolveSource(nameOrId)
        return this.post(`/api/v2/source/${sourceId}/aggregate`, params)
      },

      push: async docs => {
        const sourceId = await this._resolveSource(nameOrId)
        return this.post(`/api/v2/source/${sourceId}/push`, docs)
      },

      /**
       * Document-level operations scoped to this source.
       * @namespace
       */
      Doc: {
        /**
         * Fetches a single document by its ID from the main (transformed) index.
         *
         * @param {string} docId - The document's identifier.
         * @returns {Promise<ServiceResponse>}
         */
        get: async docId => {
          const sourceId = await this._resolveSource(nameOrId)
          return this.get(`/api/v2/source/${sourceId}/doc/${docId}`)
        },

        /**
         * Creates a new document in the source.
         *
         * The document is run through the source's field type transformations
         * and stored in both the raw index (original values) and the main index
         * (transformed values). The document ID is derived from a hash of the
         * raw field values, making creates idempotent for identical payloads.
         *
         * @param {Object} body - Flat document object (no nested keys, no `_id` or `id` fields).
         * @param {Object} [options]
         * @param {boolean} [options.return] - When `true`, returns `{ id, doc }` instead of `{ id, result: 'created' }`.
         * @returns {Promise<ServiceResponse>}
         *
         * @example
         * const { result } = await insights.Source('Orders').Doc.create(
         *   { temperature: 21.5, location: 'Berlin' },
         *   { return: true }
         * )
         * // result.id, result.doc
         */
        create: async (body, options = {}) => {
          const sourceId = await this._resolveSource(nameOrId)
          return this.post(`/api/v2/source/${sourceId}/doc`, { data: body, ...options })
        },

        /**
         * Updates an existing document by its ID.
         *
         * By default this is a full replacement: the incoming `body` replaces all
         * stored fields. Pass `update: true` for a merge update — the service fetches
         * the current raw document, shallow-merges your `body` on top, then re-indexes.
         *
         * @param {string} docId - The document's identifier.
         * @param {Object} body - New field values. Must be flat (no nesting, no reserved fields).
         * @param {Object} [options]
         * @param {boolean} [options.update] - When `true`, merges `body` into the existing document rather than replacing it.
         * @param {boolean} [options.return] - When `true`, returns the resulting document after indexing.
         * @returns {Promise<ServiceResponse>}
         *
         * @example
         * // Full replacement
         * await insights.Source('Orders').Doc.update('doc-abc', { temperature: 22.0, location: 'Berlin' })
         *
         * // Partial update — only changes temperature, keeps other fields
         * await insights.Source('Orders').Doc.update('doc-abc', { temperature: 22.0 }, { update: true, return: true })
         */
        update: async (docId, body, options = {}) => {
          const sourceId = await this._resolveSource(nameOrId)
          return this.put(`/api/v2/source/${sourceId}/doc/${docId}`, { data: body, ...options })
        },

        /**
         * Deletes a document from both the raw and main indices.
         *
         * @param {string} docId - The document's identifier.
         * @param {Object} [options]
         * @param {boolean} [options.return] - When `true`, returns `{ id, doc }` with the deleted document instead of 204.
         * @returns {Promise<ServiceResponse>}
         *
         * @example
         * const { result } = await insights.Source('Orders').Doc.delete('doc-abc', { return: true })
         * // result.id, result.doc
         */
        delete: async (docId, options = {}) => {
          const sourceId = await this._resolveSource(nameOrId)
          const query = options.return ? '?return=true' : ''
          return this.delete(`/api/v2/source/${sourceId}/doc/${docId}${query}`)
        },

        /**
         * Inserts multiple documents in a single bulk request.
         *
         * Each document is independently transformed and hashed. The response includes
         * counts of created and updated documents and a per-document error list for any
         * that failed ES indexing. Pass `return: true` to include the computed document
         * IDs in the response.
         *
         * The batch endpoint also accepts `Content-Type: application/x-ndjson` for
         * streaming ingestion from tools like Vector — in that case send raw JSONL lines
         * directly without the `{ data }` envelope.
         *
         * @param {Object[]} docs - Array of flat document objects.
         * @param {Object} [options]
         * @param {boolean} [options.return] - When `true`, includes `ids` (array of doc IDs) in the response.
         * @returns {Promise<ServiceResponse>}
         * @throws {Error} `scenid/empty-batch` – `docs` is empty or not an array.
         *
         * @example
         * const { result } = await insights.Source('Orders').Doc.batchInsert([
         *   { temperature: 21.5, location: 'Berlin' },
         *   { temperature: 19.0, location: 'Hamburg' }
         * ], { return: true })
         * // result.created, result.updated, result.failed, result.ids
         */
        batchInsert: (docs, options = {}) => {
          if (!Array.isArray(docs) || !docs.length) throw new Error('scenid/empty-batch')
          return (async () => {
            const sourceId = await this._resolveSource(nameOrId)
            return this.post(`/api/v2/source/${sourceId}/batch`, { data: docs, ...options })
          })()
        }
      }
    })

    /**
     * Returns a handle for fetching documents across multiple sources at once.
     *
     * Source names (e.g. `['Orders', 'Inventory']`) are resolved to sourceIds via
     * the local cache. Results are grouped by source name in the response.
     *
     * @param {string[]} namesOrIds - List of source names or sourceIds.
     * @returns {Object}
     *
     * @example
     * const { result } = await insights.Sources(['Orders', 'Inventory'])
     *   .Docs.find({ humanId: 'HT-4d44d' })
     * // result.total, result.Orders, result.Inventory
     */
    this.Sources = namesOrIds => ({
      Docs: {
        /**
         * Fetches documents from all listed sources matching the given filter.
         *
         * Each key-value pair in `filterObj` is treated as an equality condition.
         * Results are grouped by source name.
         *
         * @param {Object} [filterObj={}] - `{ field: value }` pairs, all eq-matched.
         * @param {Object} [options]
         * @param {number} [options.size=10] - Max docs to return (server caps at 1000).
         * @returns {Promise<ServiceResponse>} `{ total, [sourceName]: docs[] }`
         */
        find: async (filterObj = {}, { size = 10 } = {}) => {
          const resolved = await Promise.all(namesOrIds.map(n => this._resolveSource(n)))
          const filter = Object.entries(filterObj).map(([field, value]) => ({ field, operator: 'eq', value }))
          const params = new URLSearchParams({ size: String(size) })
          if (filter.length) params.set('filter', JSON.stringify(filter))
          return this.get(`/api/v2/sources/${resolved.join(',')}/docs?${params}`)
        }
      }
    })
  }

  async _refreshSourceCache () {
    const existing = _sourceCache.get(this.serviceUrl)
    if (existing?.promise) {
      await existing.promise
      return
    }

    let resolve
    const promise = new Promise(r => { resolve = r })
    _sourceCache.set(this.serviceUrl, { ...(existing ?? {}), promise })

    try {
      const { result } = await this.get('/api/v2/source')
      const sources = Array.isArray(result) ? result : []
      const byName = new Map()
      const byId = new Map()
      sources.forEach(s => {
        byName.set(s.name, s.sourceId)
        byId.set(s.sourceId, s.sourceId)
      })
      _sourceCache.set(this.serviceUrl, { byName, byId, fetchedAt: Date.now(), promise: undefined })
      resolve()
    } catch (e) {
      _sourceCache.delete(this.serviceUrl)
      resolve()
      throw e
    }
  }

  async _resolveSource (key) {
    const cache = _sourceCache.get(this.serviceUrl)

    if (!isCacheStale(cache)) {
      if (cache.byName.has(key)) return cache.byName.get(key)
      if (cache.byId.has(key)) return key
      if (!isCacheFresh(cache)) {
        await this._refreshSourceCache()
        const fresh = _sourceCache.get(this.serviceUrl)
        if (fresh?.byName.has(key)) return fresh.byName.get(key)
      }
      throw new Error('insights/source-not-found')
    }

    await this._refreshSourceCache()
    const fresh = _sourceCache.get(this.serviceUrl)
    if (fresh?.byName.has(key)) return fresh.byName.get(key)
    throw new Error('insights/source-not-found')
  }
}

export default InsightsService