package main import ( "bytes" "context" "fmt" "image/jpeg" "io" "log" "log/slog" "net/http" "net/url" "os" "strconv" "strings" "time" "git.zio.sh/astra/bsky2tg/bsky" tgbotapi "github.com/OvyFlash/telegram-bot-api" // apibsky "github.com/bluesky-social/indigo/api/bsky" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" vidio "git.zio.sh/astra/Vidio" ) const ( serverAddr = "wss://jetstream2.us-west.bsky.network/subscribe" // serverAddr = "wss://stream.zio.blue/subscribe" postFormat = "%s\nā\nš¦ @%s" quotePostFormat = "
%s\nā”ļø @%s\nā\nš¦ @%s" ) type handler struct { seenSeqs map[int64]struct{} tg *tgbotapi.BotAPI bsky *bsky.BSky } func main() { var handle = os.Getenv("BSKY_HANDLE") var password = os.Getenv("BSKY_PASSWORD") bskyClient := bsky.NewBSky() err := bskyClient.Auth([]string{handle, password}) if err != nil { log.Fatal(err, ". please set BSKY_HANDLE and BSKY_PASSWORD env variables") } ctx := context.Background() slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelDebug.Level(), }))) logger := slog.Default() config := client.DefaultClientConfig() config.WebsocketURL = serverAddr config.WantedCollections = []string{"app.bsky.feed.post"} config.WantedDids = []string{bskyClient.Bluesky.Cfg.DID} config.Compress = true h := &handler{ seenSeqs: make(map[int64]struct{}), bsky: bskyClient, } scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) c, err := client.NewClient(config, logger, scheduler) if err != nil { log.Fatalf("failed to create client: %v", err) } bot, err := tgbotapi.NewBotAPIWithAPIEndpoint(os.Getenv("TG_TOKEN"), "https://bot.astra.blue/bot%s/%s") if err != nil { panic(err) } h.tg = bot if os.Getenv("TG_CHANNEL_ID") == "" { log.Fatal("TG_CHANNEL_ID is not set") } // ------------------------------------------------------------------------------ // file, err := os.Open("posts.json") // if err != nil { // fmt.Printf("Error opening file: %v\n", err) // return // } // defer file.Close() // byteValue, err := io.ReadAll(file) // if err != nil { // fmt.Printf("Error reading file: %v\n", err) // return // } // var posts = struct { // Records []struct { // URI string `json:"uri"` // CID string `json:"cid"` // Value *bsky.Post `json:"value"` // } `json:"records"` // }{} // // 4. Unmarshal (decode) the JSON data into the struct // err = json.Unmarshal(byteValue, &posts) // if err != nil { // fmt.Printf("Error unmarshaling JSON: %v\n", err) // return // } // for _, post := range posts.Records { // log.Printf("post: %s\n", post.Value.ProcessFacets()) // s, _ := json.Marshal(post.Value) // h.ProcessPost(&models.Event{Did: bskyClient.Bluesky.Cfg.DID, Commit: &models.Commit{ // Record: s, // RKey: strings.Split(post.URI, "/")[4], // CID: post.CID, // Collection: "app.bsky.feed.post", // }}) // time.Sleep(time.Second * 2) // } // return // ------------------------------------------------------------------------------ cursor := time.Now().UnixMicro() restartCount := 0 loop: if err := c.ConnectAndRead(ctx, &cursor); err != nil { log.Printf("failed to connect: %v. retrying", err) if restartCount >= 3 { log.Print("restart count limit hit, setting cursor to now\n") restartCount = 0 cursor = time.Now().UnixMicro() } else { restartCount += 1 cursor = int64(bskyClient.Bluesky.Cfg.Cursor) } goto loop } } func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { if event.Commit == nil { return nil } if event.Commit.Operation == models.CommitOperationCreate || event.Commit.Operation == models.CommitOperationUpdate { h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post bsky.PersistAuthSession(h.bsky.Bluesky.Cfg) h.ProcessPost(event) } else if event.Commit.Operation == models.CommitOperationDelete { h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post bsky.PersistAuthSession(h.bsky.Bluesky.Cfg) r, e := h.bsky.Bluesky.GetTelegramData(event.Commit.RKey) if e == "" { m := tgbotapi.NewDeleteMessage(r.ChannelID, r.MessageID) h.tg.Send(m) h.bsky.Bluesky.DeleteRecord([]string{event.Commit.RKey, event.Did, event.Commit.Collection}) } } return nil } func (h *handler) ProcessPost(event *models.Event) error { ps, _ := h.bsky.ParsePost(event.Commit.Record) po := ps.GetEmbeds() cid, _ := strconv.ParseInt(os.Getenv("TG_CHANNEL_ID"), 10, 64) if ps.IsReply() { //|| ps.IsQuotePost() { // don't want to post replies to channel return nil } var captionText string if ps.IsQuotePost() { if ps.Embed.Record.Type == "app.bsky.embed.record" { handle, _ := h.bsky.GetHandleFromDID(strings.Split(ps.Embed.Record.Record.URI, "/")[2]) captionText = fmt.Sprintf( quotePostFormat, ps.ProcessFacets(), strings.Split(ps.Embed.Record.Record.URI, "/")[2], strings.Split(ps.Embed.Record.Record.URI, "/")[4], handle, event.Did, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle) } else { handle, _ := h.bsky.GetHandleFromDID(strings.Split(ps.Embed.Record.URI, "/")[2]) captionText = fmt.Sprintf( quotePostFormat, ps.ProcessFacets(), strings.Split(ps.Embed.Record.URI, "/")[2], strings.Split(ps.Embed.Record.URI, "/")[4], handle, event.Did, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle) } } if captionText == "" { if ps.ProcessFacets() != "" { captionText = fmt.Sprintf(postFormat, ps.ProcessFacets(), h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle) } else { captionText = fmt.Sprintf("š¦ @%s", h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle) } } // post has media if len((*po)) != 0 { mediaGroup := []tgbotapi.InputMedia{} if (*po)[0].Type == "external" { tenorGif := tgbotapi.NewInputMediaVideo(tgbotapi.FileURL((*po)[0].URI)) // is most likely gif from Tenor tenorGif.Caption = captionText tenorGif.ParseMode = tgbotapi.ModeHTML mediaGroup = append(mediaGroup, &tenorGif) } else { for _, media := range *po { switch media.Type { case "image": mediaAdd := tgbotapi.NewInputMediaPhoto(tgbotapi.FileURL(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))) if len(mediaGroup) == 0 { mediaAdd.Caption = captionText mediaAdd.ParseMode = tgbotapi.ModeHTML } mediaGroup = append(mediaGroup, &mediaAdd) case "video": log.Printf("Fetching video: %s\n", buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI)) resp, _ := http.Get(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI)) defer resp.Body.Close() f, _ := os.Create(media.URI + ".mp4") defer f.Close() io.Copy(f, resp.Body) f.Seek(0, 0) mediaAdd := tgbotapi.NewInputMediaVideo(tgbotapi.FileReader{Name: "video.mp4", Reader: f}) metadata, err := getVideoMetadata(f.Name()) if err != nil { log.Printf("Unable to read video metadata: %s\n", buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI)) break } mediaAdd.SupportsStreaming = true mediaAdd.Height = metadata.Height() mediaAdd.Width = metadata.Width() mediaAdd.Duration = int(metadata.Duration()) frames, _ := metadata.ReadFrames(0) var buf bytes.Buffer jpeg.Encode(&buf, frames[0], &jpeg.Options{Quality: 90}) mediaAdd.Thumb = tgbotapi.FileBytes{Name: "thumb.jpg", Bytes: buf.Bytes()} if len(mediaGroup) == 0 { mediaAdd.Caption = captionText mediaAdd.ParseMode = tgbotapi.ModeHTML } os.Remove(media.URI + ".mp4") mediaGroup = append(mediaGroup, &mediaAdd) } } } if len(mediaGroup) == 0 { log.Print("No mediaGroup to send, see previous error") } else { resp, _ := h.tg.SendMediaGroup(tgbotapi.NewMediaGroup(cid, mediaGroup)) uri, cid := getLink(event) h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{ ChannelID: resp[0].Chat.ID, MessageID: resp[0].MessageID, Link: &bsky.Link{ Cid: cid, URI: uri, }, }, event.Commit.RKey) } } else { m := tgbotapi.MessageConfig{} if captionText == "" { m = tgbotapi.NewMessage(cid, fmt.Sprintf(postFormat, ps.ProcessFacets(), h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle)) } else { m = tgbotapi.NewMessage(cid, captionText) } m.ParseMode = tgbotapi.ModeHTML if ps.IsQuotePost() { m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{ IsDisabled: false, URL: fmt.Sprintf("https://bsky.app/profile/%s/post/%s", strings.Split(ps.Embed.Record.URI, "/")[2], strings.Split(ps.Embed.Record.URI, "/")[4]), PreferSmallMedia: true, ShowAboveText: true, } } else { m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{IsDisabled: true} } resp, _ := h.tg.Send(m) uri, cid := getLink(event) h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{ ChannelID: resp.Chat.ID, MessageID: resp.MessageID, Link: &bsky.Link{ Cid: cid, URI: uri, }, }, event.Commit.RKey) } return nil } func buildBlobURL(server string, did string, cid string) string { return server + "/xrpc/com.atproto.sync.getBlob?did=" + url.QueryEscape(did) + "&cid=" + url.QueryEscape(cid) } func getLink(event *models.Event) (string, string) { return fmt.Sprintf("at://%s/%s/%s", event.Did, event.Commit.Collection, event.Commit.RKey), event.Commit.CID } func getVideoMetadata(fname string) (video *vidio.Video, err error) { video, err = vidio.NewVideo(fname) return }