Fix view in migration. Add by lang metric to consumer.
parent
600dac7694
commit
db425b1d5f
4
Makefile
4
Makefile
|
@ -39,11 +39,11 @@ logs:
|
|||
psql:
|
||||
@docker compose exec -it postgres psql -U postgres -d bluesky
|
||||
|
||||
init-db: init.sql
|
||||
init-db:
|
||||
@docker compose up -d --build lister
|
||||
@sleep 10
|
||||
@docker compose stop lister
|
||||
@cat db-migration/init.sql | docker exec -i "$$(docker compose ps --format '{{.Names}}' postgres)" psql -U postgres -d bluesky
|
||||
@cat ./db-migration/init.sql | docker exec -i "$$(docker compose ps --format '{{.Names}}' postgres)" psql -U postgres -d bluesky
|
||||
|
||||
# ---------------------------- Database ----------------------------
|
||||
|
||||
|
|
|
@ -295,6 +295,14 @@ func (c *Consumer) processMessage(ctx context.Context, typ string, r io.Reader,
|
|||
log.Warn().Msgf("Unexpected key format: %q", k)
|
||||
continue
|
||||
}
|
||||
langs, _, err := repo.GetLang(ctx, v)
|
||||
if err == nil {
|
||||
lang := ""
|
||||
if len(langs) != 0 {
|
||||
lang = langs[0]
|
||||
}
|
||||
postsByLanguageIndexed.WithLabelValues(c.remote.Host, lang).Inc()
|
||||
}
|
||||
recs = append(recs, repo.Record{
|
||||
Repo: models.ID(repoInfo.ID),
|
||||
Collection: parts[0],
|
||||
|
|
|
@ -39,6 +39,25 @@ type Config struct {
|
|||
|
||||
var config Config
|
||||
|
||||
// todo figure out how to use this shit
|
||||
|
||||
// type LangTimestampCollector struct {
|
||||
// metric *prometheus.Desc
|
||||
// }
|
||||
|
||||
// func (c *LangTimestampCollector) Describe(ch chan<- *prometheus.Desc) {
|
||||
// ch <- c.metric
|
||||
// }
|
||||
|
||||
// func (c *LangTimestampCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
// // your logic should be placed here
|
||||
|
||||
// t := time.Date(2009, time.November, 10, 23, 0, 0, 12345678, time.UTC)
|
||||
// s := prometheus.NewMetricWithTimestamp(t, prometheus.MustNewConstMetric(c.metric, prometheus.CounterValue, 123))
|
||||
|
||||
// ch <- s
|
||||
// }
|
||||
|
||||
func runMain(ctx context.Context) error {
|
||||
ctx = setupLogging(ctx)
|
||||
log := zerolog.Ctx(ctx)
|
||||
|
@ -69,6 +88,16 @@ func runMain(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
// collector := &LangTimestampCollector{
|
||||
// metric: prometheus.NewDesc(
|
||||
// "indexer_posts_by_language_timestamp_count",
|
||||
// "Language metric with custom TS",
|
||||
// nil,
|
||||
// nil,
|
||||
// ),
|
||||
// }
|
||||
// prometheus.MustRegister(collector)
|
||||
|
||||
log.Info().Msgf("Starting HTTP listener on %q...", config.MetricsPort)
|
||||
http.Handle("/metrics", promhttp.Handler())
|
||||
srv := &http.Server{Addr: fmt.Sprintf(":%s", config.MetricsPort)}
|
||||
|
|
|
@ -19,3 +19,8 @@ var reposDiscovered = promauto.NewCounterVec(prometheus.CounterOpts{
|
|||
Name: "repo_discovered_counter",
|
||||
Help: "Counter of newly discovered repos",
|
||||
}, []string{"remote"})
|
||||
|
||||
var postsByLanguageIndexed = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
Name: "indexer_posts_by_language_count",
|
||||
Help: "Number of posts by language",
|
||||
}, []string{"remote", "lang"})
|
||||
|
|
|
@ -35,11 +35,6 @@ var recordsInserted = promauto.NewCounter(prometheus.CounterOpts{
|
|||
Help: "Number of records inserted into DB",
|
||||
})
|
||||
|
||||
// var postsByLanguageIndexed = promauto.NewCounterVec(prometheus.CounterOpts{
|
||||
// Name: "indexer_posts_by_language_inserted_count",
|
||||
// Help: "Number of posts inserted into DB by language",
|
||||
// }, []string{"lang"})
|
||||
|
||||
var workerPoolSize = promauto.NewGauge(prometheus.GaugeOpts{
|
||||
Name: "indexer_workers_count",
|
||||
Help: "Current number of workers running",
|
||||
|
|
|
@ -221,10 +221,7 @@ retry:
|
|||
continue
|
||||
}
|
||||
v = regexp.MustCompile(`[^\\](\\\\)*(\\u0000)`).ReplaceAll(v, []byte(`$1<0x00>`))
|
||||
// lang, err := repo.GetLang(ctx, v)
|
||||
// if err == nil {
|
||||
// postsByLanguageIndexed.WithLabelValues(u.String(), lang).Inc()
|
||||
// }
|
||||
|
||||
recs = append(recs, repo.Record{
|
||||
Repo: models.ID(work.Repo.ID),
|
||||
Collection: parts[0],
|
||||
|
|
|
@ -23,8 +23,22 @@ partition of records for values in ('app.bsky.feed.repost');
|
|||
create table records_profile
|
||||
partition of records for values in ('app.bsky.actor.profile');
|
||||
|
||||
ALTER TABLE records_like
|
||||
ADD CHECK (collection in ('app.bsky.feed.like'));
|
||||
|
||||
-- SLOW, can run overnight, make sure to run in tmux or eternal terminal
|
||||
ALTER TABLE records_post
|
||||
ADD CHECK (collection in ('app.bsky.feed.post'));
|
||||
|
||||
ALTER TABLE records_follow
|
||||
ADD CHECK (collection in ('app.bsky.graph.follow'));
|
||||
|
||||
ALTER TABLE records_repost
|
||||
ADD CHECK (collection in ('app.bsky.feed.repost'));
|
||||
|
||||
ALTER TABLE records_profile
|
||||
ADD CHECK (collection in ('app.bsky.actor.profile'));
|
||||
|
||||
-- SLOW, can run overnight
|
||||
with moved_rows as (
|
||||
delete from records_like r
|
||||
where collection <> 'app.bsky.feed.like'
|
||||
|
@ -32,9 +46,9 @@ with moved_rows as (
|
|||
)
|
||||
insert into records select * from moved_rows;
|
||||
|
||||
-- ULTRA SLOW, DO NOT RUN on large DB
|
||||
alter table records attach partition records_like for values in ('app.bsky.feed.like');
|
||||
|
||||
|
||||
create index idx_like_subject
|
||||
on records_like
|
||||
(split_part(jsonb_extract_path_text(content, 'subject', 'uri'), '/', 3));
|
|
@ -22,7 +22,7 @@ create index post_created_at on records_post (parse_timestamp(jsonb_extract_path
|
|||
|
||||
create view posts as
|
||||
select *, jsonb_extract_path(content, 'langs') as langs,
|
||||
parse_timestamp(jsonb_extract_path_text(content, 'createdAt')) as created_at
|
||||
parse_timestamp(jsonb_extract_path_text(content, 'createdAt')) as content_created_at
|
||||
from records_post;
|
||||
|
||||
explain select count(*) from posts where langs ? 'uk' and content_created_at > now() - interval '1 day';
|
|
@ -7,6 +7,15 @@ partition of records for values in ('app.bsky.graph.listblock');
|
|||
create table records_listitem
|
||||
partition of records for values in ('app.bsky.graph.listitem');
|
||||
|
||||
ALTER TABLE records_list
|
||||
ADD CHECK (collection in ('app.bsky.graph.list'));
|
||||
|
||||
ALTER TABLE records_listblock
|
||||
ADD CHECK (collection in ('app.bsky.graph.listblock'));
|
||||
|
||||
ALTER TABLE records_listitem
|
||||
ADD CHECK (collection in ('app.bsky.graph.listitem'));
|
||||
|
||||
with moved_rows as (
|
||||
delete from records_default r
|
||||
where collection in ('app.bsky.graph.list', 'app.bsky.graph.listblock', 'app.bsky.graph.listitem')
|
|
@ -56,6 +56,11 @@ Restart errors
|
|||
|
||||
`update repos set failed_attempts=0, last_error='' where failed_attempts >0;`
|
||||
|
||||
# MONITORING
|
||||
|
||||
More verbose logging for queries DEBUG1-DEBUG5
|
||||
`set client_min_messages = 'DEBUG5';`
|
||||
|
||||
Take a look at slow queries
|
||||
```
|
||||
SELECT pid, age(clock_timestamp(), query_start), state, query
|
||||
|
@ -64,6 +69,9 @@ WHERE query != '<IDLE>' AND query NOT ILIKE '%pg_stat_activity%'
|
|||
ORDER BY query_start asc;
|
||||
```
|
||||
|
||||
Monitor index progress
|
||||
`select * from pg_stat_progress_create_index;`
|
||||
|
||||
Explore new collection types
|
||||
|
||||
```
|
||||
|
|
41
repo/mst.go
41
repo/mst.go
|
@ -7,6 +7,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"github.com/ipld/go-car"
|
||||
|
@ -295,24 +296,26 @@ func GetRev(ctx context.Context, b io.Reader) (string, error) {
|
|||
return s, nil
|
||||
}
|
||||
|
||||
// func GetLang(ctx context.Context, value json.RawMessage) (string, error) {
|
||||
// var content map[string]interface{}
|
||||
// var lang string
|
||||
// err := json.Unmarshal([]byte(value), &content)
|
||||
func GetLang(ctx context.Context, value json.RawMessage) ([]string, time.Time, error) {
|
||||
var content struct {
|
||||
Type string `json:"$type"`
|
||||
Langs []string `json:"langs"`
|
||||
Time string `json:"createdAt"`
|
||||
}
|
||||
err := json.Unmarshal([]byte(value), &content)
|
||||
|
||||
// if err != nil {
|
||||
// return "", fmt.Errorf("failed to extract lang from content")
|
||||
// }
|
||||
if err != nil {
|
||||
return nil, time.Now(), fmt.Errorf("failed to extract lang from content: %w", err)
|
||||
}
|
||||
if content.Type != "app.bsky.feed.post" {
|
||||
return nil, time.Now(), errors.New("not a post")
|
||||
}
|
||||
|
||||
// if content["$type"] != "app.bsky.feed.post" ||
|
||||
// content["langs"] == nil ||
|
||||
// content["langs"].([]string) == nil ||
|
||||
// len(content["langs"].([]string)) == 0 {
|
||||
// return "", errors.New("not a post")
|
||||
// }
|
||||
|
||||
// //todo: do something a bit less dumb than that
|
||||
// lang = content["langs"].([]string)[0]
|
||||
|
||||
// return lang, nil
|
||||
// }
|
||||
var timestamp time.Time
|
||||
if t, err := time.Parse(time.RFC3339, content.Time); err != nil {
|
||||
return nil, time.Now(), fmt.Errorf("failed to extract time from content: %w", err)
|
||||
} else {
|
||||
timestamp = t
|
||||
}
|
||||
return content.Langs, timestamp, nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue