Bsky short link service (#4542)

* bskylink: scaffold service w/ initial config and schema

* bskylink: implement link creation and redirects

* bskylink: tidy

* bskylink: tests

* bskylink: tidy, add error handler

* bskylink: add dockerfile

* bskylink: add build

* bskylink: fix some express plumbing

* bskyweb: proxy fallthrough routes to link service redirects

* bskyweb: build w/ link proxy

* Add AASA to bskylink (#4588)

---------

Co-authored-by: Hailey <me@haileyok.com>
This commit is contained in:
devin ivy 2024-06-21 12:41:06 -04:00 committed by GitHub
parent ba21fddd78
commit 55812b0394
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
29 changed files with 2118 additions and 1 deletions

24
bskylink/src/bin.ts Normal file
View file

@ -0,0 +1,24 @@
import {Database, envToCfg, httpLogger, LinkService, readEnv} from './index.js'
async function main() {
const env = readEnv()
const cfg = envToCfg(env)
if (cfg.db.migrationUrl) {
const migrateDb = Database.postgres({
url: cfg.db.migrationUrl,
schema: cfg.db.schema,
})
await migrateDb.migrateToLatestOrThrow()
await migrateDb.close()
}
const link = await LinkService.create(cfg)
await link.start()
httpLogger.info('link service is running')
process.on('SIGTERM', async () => {
httpLogger.info('link service is stopping')
await link.destroy()
httpLogger.info('link service is stopped')
})
}
main()

82
bskylink/src/config.ts Normal file
View file

@ -0,0 +1,82 @@
import {envInt, envList, envStr} from '@atproto/common'
export type Config = {
service: ServiceConfig
db: DbConfig
}
export type ServiceConfig = {
port: number
version?: string
hostnames: string[]
appHostname: string
}
export type DbConfig = {
url: string
migrationUrl?: string
pool: DbPoolConfig
schema?: string
}
export type DbPoolConfig = {
size: number
maxUses: number
idleTimeoutMs: number
}
export type Environment = {
port?: number
version?: string
hostnames: string[]
appHostname?: string
dbPostgresUrl?: string
dbPostgresMigrationUrl?: string
dbPostgresSchema?: string
dbPostgresPoolSize?: number
dbPostgresPoolMaxUses?: number
dbPostgresPoolIdleTimeoutMs?: number
}
export const readEnv = (): Environment => {
return {
port: envInt('LINK_PORT'),
version: envStr('LINK_VERSION'),
hostnames: envList('LINK_HOSTNAMES'),
appHostname: envStr('LINK_APP_HOSTNAME'),
dbPostgresUrl: envStr('LINK_DB_POSTGRES_URL'),
dbPostgresMigrationUrl: envStr('LINK_DB_POSTGRES_MIGRATION_URL'),
dbPostgresSchema: envStr('LINK_DB_POSTGRES_SCHEMA'),
dbPostgresPoolSize: envInt('LINK_DB_POSTGRES_POOL_SIZE'),
dbPostgresPoolMaxUses: envInt('LINK_DB_POSTGRES_POOL_MAX_USES'),
dbPostgresPoolIdleTimeoutMs: envInt(
'LINK_DB_POSTGRES_POOL_IDLE_TIMEOUT_MS',
),
}
}
export const envToCfg = (env: Environment): Config => {
const serviceCfg: ServiceConfig = {
port: env.port ?? 3000,
version: env.version,
hostnames: env.hostnames,
appHostname: env.appHostname || 'bsky.app',
}
if (!env.dbPostgresUrl) {
throw new Error('Must configure postgres url (LINK_DB_POSTGRES_URL)')
}
const dbCfg: DbConfig = {
url: env.dbPostgresUrl,
migrationUrl: env.dbPostgresMigrationUrl,
schema: env.dbPostgresSchema,
pool: {
idleTimeoutMs: env.dbPostgresPoolIdleTimeoutMs ?? 10000,
maxUses: env.dbPostgresPoolMaxUses ?? Infinity,
size: env.dbPostgresPoolSize ?? 10,
},
}
return {
service: serviceCfg,
db: dbCfg,
}
}

33
bskylink/src/context.ts Normal file
View file

@ -0,0 +1,33 @@
import {Config} from './config.js'
import Database from './db/index.js'
export type AppContextOptions = {
cfg: Config
db: Database
}
export class AppContext {
cfg: Config
db: Database
abortController = new AbortController()
constructor(private opts: AppContextOptions) {
this.cfg = this.opts.cfg
this.db = this.opts.db
}
static async fromConfig(cfg: Config, overrides?: Partial<AppContextOptions>) {
const db = Database.postgres({
url: cfg.db.url,
schema: cfg.db.schema,
poolSize: cfg.db.pool.size,
poolMaxUses: cfg.db.pool.maxUses,
poolIdleTimeoutMs: cfg.db.pool.idleTimeoutMs,
})
return new AppContext({
cfg,
db,
...overrides,
})
}
}

174
bskylink/src/db/index.ts Normal file
View file

@ -0,0 +1,174 @@
import assert from 'assert'
import {
Kysely,
KyselyPlugin,
Migrator,
PluginTransformQueryArgs,
PluginTransformResultArgs,
PostgresDialect,
QueryResult,
RootOperationNode,
UnknownRow,
} from 'kysely'
import {default as Pg} from 'pg'
import {dbLogger as log} from '../logger.js'
import {default as migrations} from './migrations/index.js'
import {DbMigrationProvider} from './migrations/provider.js'
import {DbSchema} from './schema.js'
export class Database {
migrator: Migrator
destroyed = false
constructor(public db: Kysely<DbSchema>, public cfg: PgConfig) {
this.migrator = new Migrator({
db,
migrationTableSchema: cfg.schema,
provider: new DbMigrationProvider(migrations),
})
}
static postgres(opts: PgOptions): Database {
const {schema, url, txLockNonce} = opts
const pool =
opts.pool ??
new Pg.Pool({
connectionString: url,
max: opts.poolSize,
maxUses: opts.poolMaxUses,
idleTimeoutMillis: opts.poolIdleTimeoutMs,
})
// Select count(*) and other pg bigints as js integer
Pg.types.setTypeParser(Pg.types.builtins.INT8, n => parseInt(n, 10))
// Setup schema usage, primarily for test parallelism (each test suite runs in its own pg schema)
if (schema && !/^[a-z_]+$/i.test(schema)) {
throw new Error(`Postgres schema must only contain [A-Za-z_]: ${schema}`)
}
pool.on('error', onPoolError)
const db = new Kysely<DbSchema>({
dialect: new PostgresDialect({pool}),
})
return new Database(db, {
pool,
schema,
url,
txLockNonce,
})
}
async transaction<T>(fn: (db: Database) => Promise<T>): Promise<T> {
const leakyTxPlugin = new LeakyTxPlugin()
return this.db
.withPlugin(leakyTxPlugin)
.transaction()
.execute(txn => {
const dbTxn = new Database(txn, this.cfg)
return fn(dbTxn)
.catch(async err => {
leakyTxPlugin.endTx()
// ensure that all in-flight queries are flushed & the connection is open
await dbTxn.db.getExecutor().provideConnection(async () => {})
throw err
})
.finally(() => leakyTxPlugin.endTx())
})
}
get schema(): string | undefined {
return this.cfg.schema
}
get isTransaction() {
return this.db.isTransaction
}
assertTransaction() {
assert(this.isTransaction, 'Transaction required')
}
assertNotTransaction() {
assert(!this.isTransaction, 'Cannot be in a transaction')
}
async close(): Promise<void> {
if (this.destroyed) return
await this.db.destroy()
this.destroyed = true
}
async migrateToOrThrow(migration: string) {
if (this.schema) {
await this.db.schema.createSchema(this.schema).ifNotExists().execute()
}
const {error, results} = await this.migrator.migrateTo(migration)
if (error) {
throw error
}
if (!results) {
throw new Error('An unknown failure occurred while migrating')
}
return results
}
async migrateToLatestOrThrow() {
if (this.schema) {
await this.db.schema.createSchema(this.schema).ifNotExists().execute()
}
const {error, results} = await this.migrator.migrateToLatest()
if (error) {
throw error
}
if (!results) {
throw new Error('An unknown failure occurred while migrating')
}
return results
}
}
export default Database
export type PgConfig = {
pool: Pg.Pool
url: string
schema?: string
txLockNonce?: string
}
type PgOptions = {
url: string
pool?: Pg.Pool
schema?: string
poolSize?: number
poolMaxUses?: number
poolIdleTimeoutMs?: number
txLockNonce?: string
}
class LeakyTxPlugin implements KyselyPlugin {
private txOver = false
endTx() {
this.txOver = true
}
transformQuery(args: PluginTransformQueryArgs): RootOperationNode {
if (this.txOver) {
throw new Error('tx already failed')
}
return args.node
}
async transformResult(
args: PluginTransformResultArgs,
): Promise<QueryResult<UnknownRow>> {
return args.result
}
}
const onPoolError = (err: Error) => log.error({err}, 'db pool error')

View file

@ -0,0 +1,15 @@
import {Kysely} from 'kysely'
export async function up(db: Kysely<unknown>): Promise<void> {
await db.schema
.createTable('link')
.addColumn('id', 'varchar', col => col.primaryKey())
.addColumn('type', 'smallint', col => col.notNull()) // integer enum: 1->starterpack
.addColumn('path', 'varchar', col => col.notNull())
.addUniqueConstraint('link_path_unique', ['path'])
.execute()
}
export async function down(db: Kysely<unknown>): Promise<void> {
await db.schema.dropTable('link').execute()
}

View file

@ -0,0 +1,5 @@
import * as init from './001-init.js'
export default {
'001': init,
}

View file

@ -0,0 +1,8 @@
import {Migration, MigrationProvider} from 'kysely'
export class DbMigrationProvider implements MigrationProvider {
constructor(private migrations: Record<string, Migration>) {}
async getMigrations(): Promise<Record<string, Migration>> {
return this.migrations
}
}

17
bskylink/src/db/schema.ts Normal file
View file

@ -0,0 +1,17 @@
import {Selectable} from 'kysely'
export type DbSchema = {
link: Link
}
export interface Link {
id: string
type: LinkType
path: string
}
export enum LinkType {
StarterPack = 1,
}
export type LinkEntry = Selectable<Link>

45
bskylink/src/index.ts Normal file
View file

@ -0,0 +1,45 @@
import events from 'node:events'
import http from 'node:http'
import cors from 'cors'
import express from 'express'
import {createHttpTerminator, HttpTerminator} from 'http-terminator'
import {Config} from './config.js'
import {AppContext} from './context.js'
import {default as routes, errorHandler} from './routes/index.js'
export * from './config.js'
export * from './db/index.js'
export * from './logger.js'
export class LinkService {
public server?: http.Server
private terminator?: HttpTerminator
constructor(public app: express.Application, public ctx: AppContext) {}
static async create(cfg: Config): Promise<LinkService> {
let app = express()
app.use(cors())
const ctx = await AppContext.fromConfig(cfg)
app = routes(ctx, app)
app.use(errorHandler)
return new LinkService(app, ctx)
}
async start() {
this.server = this.app.listen(this.ctx.cfg.service.port)
this.server.keepAliveTimeout = 90000
this.terminator = createHttpTerminator({server: this.server})
await events.once(this.server, 'listening')
}
async destroy() {
this.ctx.abortController.abort()
await this.terminator?.terminate()
await this.ctx.db.close()
}
}

4
bskylink/src/logger.ts Normal file
View file

@ -0,0 +1,4 @@
import {subsystemLogger} from '@atproto/common'
export const httpLogger = subsystemLogger('bskylink')
export const dbLogger = subsystemLogger('bskylink:db')

View file

@ -0,0 +1,111 @@
import assert from 'node:assert'
import bodyParser from 'body-parser'
import {Express, Request} from 'express'
import {AppContext} from '../context.js'
import {LinkType} from '../db/schema.js'
import {randomId} from '../util.js'
import {handler} from './util.js'
export default function (ctx: AppContext, app: Express) {
return app.post(
'/link',
bodyParser.json(),
handler(async (req, res) => {
let path: string
if (typeof req.body?.path === 'string') {
path = req.body.path
} else {
return res.status(400).json({
error: 'InvalidPath',
message: '"path" parameter is missing or not a string',
})
}
if (!path.startsWith('/')) {
return res.status(400).json({
error: 'InvalidPath',
message:
'"path" parameter must be formatted as a path, starting with a "/"',
})
}
const parts = getPathParts(path)
if (parts.length === 3 && parts[0] === 'start') {
// link pattern: /start/{did}/{rkey}
if (!parts[1].startsWith('did:')) {
// enforce strong links
return res.status(400).json({
error: 'InvalidPath',
message:
'"path" parameter for starter pack must contain the actor\'s DID',
})
}
const id = await ensureLink(ctx, LinkType.StarterPack, parts)
return res.json({url: getUrl(ctx, req, id)})
}
return res.status(400).json({
error: 'InvalidPath',
message: '"path" parameter does not have a known format',
})
}),
)
}
const ensureLink = async (ctx: AppContext, type: LinkType, parts: string[]) => {
const normalizedPath = normalizedPathFromParts(parts)
const created = await ctx.db.db
.insertInto('link')
.values({
id: randomId(),
type,
path: normalizedPath,
})
.onConflict(oc => oc.column('path').doNothing())
.returningAll()
.executeTakeFirst()
if (created) {
return created.id
}
const found = await ctx.db.db
.selectFrom('link')
.selectAll()
.where('path', '=', normalizedPath)
.executeTakeFirstOrThrow()
return found.id
}
const getUrl = (ctx: AppContext, req: Request, id: string) => {
if (!ctx.cfg.service.hostnames.length) {
assert(req.headers.host, 'request must be made with host header')
const baseUrl =
req.protocol === 'http' && req.headers.host.startsWith('localhost:')
? `http://${req.headers.host}`
: `https://${req.headers.host}`
return `${baseUrl}/${id}`
}
const baseUrl = ctx.cfg.service.hostnames.includes(req.headers.host)
? `https://${req.headers.host}`
: `https://${ctx.cfg.service.hostnames[0]}`
return `${baseUrl}/${id}`
}
const normalizedPathFromParts = (parts: string[]): string => {
return (
'/' +
parts
.map(encodeURIComponent)
.map(part => part.replaceAll('%3A', ':')) // preserve colons
.join('/')
)
}
const getPathParts = (path: string): string[] => {
if (path === '/') return []
if (path.endsWith('/')) {
path = path.slice(0, -1) // ignore trailing slash
}
return path
.slice(1) // remove leading slash
.split('/')
.map(decodeURIComponent)
}

View file

@ -0,0 +1,20 @@
import {Express} from 'express'
import {sql} from 'kysely'
import {AppContext} from '../context.js'
import {handler} from './util.js'
export default function (ctx: AppContext, app: Express) {
return app.get(
'/_health',
handler(async (_req, res) => {
const {version} = ctx.cfg.service
try {
await sql`select 1`.execute(ctx.db.db)
return res.send({version})
} catch (err) {
return res.status(503).send({version, error: 'Service Unavailable'})
}
}),
)
}

View file

@ -0,0 +1,17 @@
import {Express} from 'express'
import {AppContext} from '../context.js'
import {default as create} from './create.js'
import {default as health} from './health.js'
import {default as redirect} from './redirect.js'
import {default as siteAssociation} from './siteAssociation.js'
export * from './util.js'
export default function (ctx: AppContext, app: Express) {
app = health(ctx, app) // GET /_health
app = siteAssociation(ctx, app) // GET /.well-known/apple-app-site-association
app = create(ctx, app) // POST /link
app = redirect(ctx, app) // GET /:linkId (should go last due to permissive matching)
return app
}

View file

@ -0,0 +1,40 @@
import assert from 'node:assert'
import {DAY, SECOND} from '@atproto/common'
import {Express} from 'express'
import {AppContext} from '../context.js'
import {handler} from './util.js'
export default function (ctx: AppContext, app: Express) {
return app.get(
'/:linkId',
handler(async (req, res) => {
const linkId = req.params.linkId
assert(
typeof linkId === 'string',
'express guarantees id parameter is a string',
)
const found = await ctx.db.db
.selectFrom('link')
.selectAll()
.where('id', '=', linkId)
.executeTakeFirst()
if (!found) {
// potentially broken or mistyped link— send user to the app
res.setHeader('Location', `https://${ctx.cfg.service.appHostname}`)
res.setHeader('Cache-Control', 'no-store')
return res.status(302).end()
}
// build url from original url in order to preserve query params
const url = new URL(
req.originalUrl,
`https://${ctx.cfg.service.appHostname}`,
)
url.pathname = found.path
res.setHeader('Location', url.href)
res.setHeader('Cache-Control', `max-age=${(7 * DAY) / SECOND}`)
return res.status(301).end()
}),
)
}

View file

@ -0,0 +1,13 @@
import {Express} from 'express'
import {AppContext} from '../context.js'
export default function (ctx: AppContext, app: Express) {
return app.get('/.well-known/apple-app-site-association', (req, res) => {
res.json({
appclips: {
apps: ['B3LX46C5HS.xyz.blueskyweb.app.AppClip'],
},
})
})
}

View file

@ -0,0 +1,23 @@
import {ErrorRequestHandler, Request, RequestHandler, Response} from 'express'
import {httpLogger} from '../logger.js'
export type Handler = (req: Request, res: Response) => Awaited<void>
export const handler = (runHandler: Handler): RequestHandler => {
return async (req, res, next) => {
try {
await runHandler(req, res)
} catch (err) {
next(err)
}
}
}
export const errorHandler: ErrorRequestHandler = (err, _req, res, next) => {
httpLogger.error({err}, 'request error')
if (res.headersSent) {
return next(err)
}
return res.status(500).end('server error')
}

8
bskylink/src/util.ts Normal file
View file

@ -0,0 +1,8 @@
import {randomBytes} from 'node:crypto'
import {toString} from 'uint8arrays'
// 40bit random id of 5-7 characters
export const randomId = () => {
return toString(randomBytes(5), 'base58btc')
}