package main import ( "bytes" "context" "encoding/json" "flag" "fmt" "image/jpeg" "io" "log" "log/slog" "net/http" "net/url" "os" "regexp" "strconv" "strings" "time" "git.zio.sh/astra/bsky2tg/bsky" tgbotapi "github.com/OvyFlash/telegram-bot-api" "github.com/bluesky-social/jetstream/pkg/client" "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" "github.com/bluesky-social/jetstream/pkg/models" vidio "git.zio.sh/astra/Vidio" ) const ( serverAddr = "wss://jetstream2.us-west.bsky.network/subscribe" // serverAddr = "wss://stream.zio.blue/subscribe" postFormat = "%s\nā\nš¦ @%s" quotePostFormat = "
%s\nā”ļø @%s\nā\nš¦ @%s" ) type handler struct { seenSeqs map[int64]struct{} tg *tgbotapi.BotAPI bsky *bsky.BSky } var ( post = flag.String("post", "", "URL to a BlueSky post") delete = flag.Bool("delete", false, "true/false to delete post") ) func main() { flag.Parse() var handle = os.Getenv("BSKY_HANDLE") var password = os.Getenv("BSKY_PASSWORD") bskyClient := bsky.NewBSky() err := bskyClient.Auth([]string{handle, password}) if err != nil { log.Fatal(err, ". please set BSKY_HANDLE and BSKY_PASSWORD env variables") } h := &handler{ seenSeqs: make(map[int64]struct{}), bsky: bskyClient, } endpoint := "https://api.telegram.org/bot%s/%s" if os.Getenv("TG_API_ENDPOINT") != "" { endpoint = os.Getenv("TG_API_ENDPOINT") } bot, err := tgbotapi.NewBotAPIWithAPIEndpoint(os.Getenv("TG_TOKEN"), endpoint) if err != nil { panic(err) } h.tg = bot if os.Getenv("TG_CHANNEL_ID") == "" { log.Fatal("TG_CHANNEL_ID is not set") } if *post != "" { r := regexp.MustCompile(`^https:\/\/.*?\/profile\/(.*?)\/post\/(.*?)$`) s := r.FindStringSubmatch(*post) handle := s[1] if s[1][0:4] != "did:" { handle, _ = bskyClient.ResolveHandle(s[1]) } if handle != bskyClient.Bluesky.Cfg.DID { log.Fatal("Unable to send posts from other accounts") } if *delete { r, e := h.bsky.Bluesky.GetTelegramData(s[2]) if e == "" { log.Printf("Found post %s in channel %d, deleting", s[2], r.ChannelID) m := tgbotapi.NewDeleteMessages(r.ChannelID, r.MessageID) h.tg.Send(m) h.bsky.Bluesky.DeleteRecord([]string{s[2], s[1], "blue.zio.bsky2tg.post"}) } else { log.Printf("Unable to find post %s on PDS", s[2]) } return } postJSON := bskyClient.Bluesky.FetchPost(handle, s[2]) p, _ := json.Marshal(postJSON.Record) h.ProcessPost(&models.Event{ Did: postJSON.Author.Did, TimeUS: postJSON.Record.CreatedAt.Unix(), Kind: "", Commit: &models.Commit{ CID: postJSON.Cid, Operation: "create", RKey: strings.Split(postJSON.URI, "/")[4], Collection: "app.bsky.feed.post", Record: p, }, }) return } ctx := context.Background() slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelDebug.Level(), }))) logger := slog.Default() config := client.DefaultClientConfig() config.WebsocketURL = serverAddr config.WantedCollections = []string{"app.bsky.feed.post"} config.WantedDids = []string{bskyClient.Bluesky.Cfg.DID} config.Compress = true scheduler := sequential.NewScheduler("jetstream_localdev", logger, h.HandleEvent) c, err := client.NewClient(config, logger, scheduler) if err != nil { log.Fatalf("failed to create client: %v", err) } cursor := time.Now().UnixMicro() restartCount := 0 loop: if err := c.ConnectAndRead(ctx, &cursor); err != nil { log.Printf("failed to connect: %v. retrying", err) if restartCount >= 3 { log.Print("restart count limit hit, setting cursor to now\n") restartCount = 0 cursor = time.Now().UnixMicro() } else { restartCount += 1 cursor = int64(bskyClient.Bluesky.Cfg.Cursor) } goto loop } } func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { if event.Commit == nil { return nil } switch event.Commit.Operation { case models.CommitOperationCreate, models.CommitOperationUpdate: h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post bsky.PersistAuthSession(h.bsky.Bluesky.Cfg) h.ProcessPost(event) case models.CommitOperationDelete: h.bsky.Bluesky.Cfg.Cursor = event.TimeUS + 1 // +1 to not show same post bsky.PersistAuthSession(h.bsky.Bluesky.Cfg) r, e := h.bsky.Bluesky.GetTelegramData(event.Commit.RKey) if e == "" { m := tgbotapi.NewDeleteMessages(r.ChannelID, r.MessageID) h.tg.Send(m) h.bsky.Bluesky.DeleteRecord([]string{event.Commit.RKey, event.Did, "blue.zio.bsky2tg.post"}) } } return nil } func (h *handler) ProcessPost(event *models.Event) error { ps, _ := h.bsky.ParsePost(event.Commit.Record) po := ps.GetEmbeds() cid, _ := strconv.ParseInt(os.Getenv("TG_CHANNEL_ID"), 10, 64) if ps.IsReply() { //|| ps.IsQuotePost() { // don't want to post replies to channel return nil } var captionText string if ps.IsQuotePost() { if ps.Embed.Record.Type == "app.bsky.embed.record" { handle, _ := h.bsky.GetHandleFromDID(strings.Split(ps.Embed.Record.Record.URI, "/")[2]) captionText = fmt.Sprintf( quotePostFormat, ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()), strings.Split(ps.Embed.Record.Record.URI, "/")[2], strings.Split(ps.Embed.Record.Record.URI, "/")[4], handle, event.Did, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle) } else { handle, _ := h.bsky.GetHandleFromDID(strings.Split(ps.Embed.Record.URI, "/")[2]) captionText = fmt.Sprintf( quotePostFormat, ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()), strings.Split(ps.Embed.Record.URI, "/")[2], strings.Split(ps.Embed.Record.URI, "/")[4], handle, event.Did, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle) } } if captionText == "" { if ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()) != "" { captionText = fmt.Sprintf(postFormat, ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()), h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle) } else { captionText = fmt.Sprintf("š¦ @%s", h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle) } } // post has media if len((*po)) != 0 { mediaGroup := []tgbotapi.InputMedia{} if (*po)[0].Type == "external" { tenorGif := tgbotapi.NewInputMediaVideo(tgbotapi.FileURL((*po)[0].URI)) // is most likely gif from Tenor tenorGif.Caption = captionText tenorGif.ParseMode = tgbotapi.ModeHTML mediaGroup = append(mediaGroup, &tenorGif) } else { for _, media := range *po { switch media.Type { case "image": mediaAdd := tgbotapi.NewInputMediaPhoto(tgbotapi.FileURL(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI))) if len(mediaGroup) == 0 { mediaAdd.Caption = captionText mediaAdd.ParseMode = tgbotapi.ModeHTML } mediaGroup = append(mediaGroup, &mediaAdd) case "video": log.Printf("Fetching video: %s\n", buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI)) resp, _ := http.Get(buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI)) defer resp.Body.Close() f, _ := os.Create(media.URI + ".mp4") defer f.Close() io.Copy(f, resp.Body) f.Seek(0, 0) mediaAdd := tgbotapi.NewInputMediaVideo(tgbotapi.FileReader{Name: "video.mp4", Reader: f}) metadata, err := getVideoMetadata(f.Name()) if err != nil { log.Printf("Unable to read video metadata: %s\n", buildBlobURL(h.bsky.Bluesky.Cfg.PDSURL, h.bsky.Bluesky.Cfg.DID, media.URI)) break } mediaAdd.SupportsStreaming = true mediaAdd.Height = metadata.Height() mediaAdd.Width = metadata.Width() mediaAdd.Duration = int(metadata.Duration()) frames, _ := metadata.ReadFrames(0) var buf bytes.Buffer jpeg.Encode(&buf, frames[0], &jpeg.Options{Quality: 90}) mediaAdd.Thumb = tgbotapi.FileBytes{Name: "thumb.jpg", Bytes: buf.Bytes()} if len(mediaGroup) == 0 { mediaAdd.Caption = captionText mediaAdd.ParseMode = tgbotapi.ModeHTML } os.Remove(media.URI + ".mp4") mediaGroup = append(mediaGroup, &mediaAdd) } } } if len(mediaGroup) == 0 { log.Print("No mediaGroup to send, see previous error") } else { resp, _ := h.tg.SendMediaGroup(tgbotapi.NewMediaGroup(cid, mediaGroup)) uri, cid := getLink(event) var messageIDs []int for _, msgID := range resp { messageIDs = append(messageIDs, msgID.MessageID) } h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{ ChannelID: resp[0].Chat.ID, MessageID: messageIDs, Link: &bsky.Link{ Cid: cid, URI: uri, }, }, event.Commit.RKey) } } else { m := tgbotapi.MessageConfig{} if captionText == "" { m = tgbotapi.NewMessage(cid, fmt.Sprintf(postFormat, ps.ProcessFacets(h.bsky.Bluesky.FetchAliases()), h.bsky.Bluesky.Cfg.DID, event.Commit.RKey, h.bsky.Bluesky.Cfg.Handle)) } else { m = tgbotapi.NewMessage(cid, captionText) } m.ParseMode = tgbotapi.ModeHTML if ps.IsQuotePost() { m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{ IsDisabled: false, URL: fmt.Sprintf("https://bsky.app/profile/%s/post/%s", strings.Split(ps.Embed.Record.URI, "/")[2], strings.Split(ps.Embed.Record.URI, "/")[4]), PreferSmallMedia: false, PreferLargeMedia: true, ShowAboveText: true, } } else { m.LinkPreviewOptions = tgbotapi.LinkPreviewOptions{IsDisabled: true} } resp, _ := h.tg.Send(m) uri, cid := getLink(event) h.bsky.Bluesky.CommitTelegramResponse(&bsky.TelegramRecord{ ChannelID: resp.Chat.ID, MessageID: []int{resp.MessageID}, Link: &bsky.Link{ Cid: cid, URI: uri, }, }, event.Commit.RKey) } return nil } func buildBlobURL(server string, did string, cid string) string { return server + "/xrpc/com.atproto.sync.getBlob?did=" + url.QueryEscape(did) + "&cid=" + cid } func getLink(event *models.Event) (string, string) { return fmt.Sprintf("at://%s/%s/%s", event.Did, event.Commit.Collection, event.Commit.RKey), event.Commit.CID } func getVideoMetadata(fname string) (video *vidio.Video, err error) { video, err = vidio.NewVideo(fname) return }